提交 bffdb7a8 编写于 作者: D dapan1121

Merge remote-tracking branch 'origin/3.0' into feature/qnode

......@@ -5,6 +5,7 @@ AccessModifierOffset: -1
AlignAfterOpenBracket: Align
AlignConsecutiveAssignments: false
AlignConsecutiveDeclarations: true
AlignConsecutiveMacros: true
AlignEscapedNewlinesLeft: true
AlignOperands: true
AlignTrailingComments: true
......@@ -86,6 +87,5 @@ SpacesInSquareBrackets: false
Standard: Auto
TabWidth: 8
UseTab: Never
AlignConsecutiveDeclarations: true
...
......@@ -582,9 +582,9 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......@@ -622,7 +622,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void* destroyOutputBuf(SSDataBlock* pBlock);
void* blockDataDestroy(SSDataBlock* pBlock);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
......
......@@ -336,7 +336,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
return res;
}
void* destroyOutputBuf(SSDataBlock* pBlock) {
void* blockDataDestroy(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return NULL;
}
......@@ -4835,11 +4835,11 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
break;
}
case OP_TableSeqScan: {
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
pRuntimeEnv->proot = createTableSeqScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
break;
}
case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
pRuntimeEnv->proot = createTableScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
break;
}
case OP_TableScan: {
......@@ -5162,7 +5162,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
return pOperator;
}
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
......@@ -5267,7 +5267,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
}
}
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -5278,7 +5278,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->name = "TableScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv;
pOptr->blockingOptr = false;
......@@ -5373,7 +5373,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->prevRow);
}
......@@ -6566,7 +6566,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
tfree(pInfo->rowCellInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6590,7 +6590,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->p);
}
......@@ -6607,12 +6607,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6625,7 +6625,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
taosHashCleanup(pInfo->pSet);
tfree(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
......
......@@ -13,7 +13,7 @@
namespace {
// simple test
void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1);
int32_t pageId = 0;
......@@ -22,40 +22,40 @@ void simpleTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getResBufSize(pResultBuf), 1024);
ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
releaseResBufPage(pResultBuf, pBufPage);
releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t = getResBufPage(pResultBuf, pageId);
tFilePage* t = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t == pBufPage1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf);
}
void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0;
......@@ -68,31 +68,31 @@ void writeDownTest() {
*(int32_t*)(pBufPage->data) = nx;
writePageId = pageId;
releaseResBufPage(pResultBuf, pBufPage);
releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
releaseBufPage(pResultBuf, t4);
// flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
......@@ -102,7 +102,7 @@ void writeDownTest() {
}
void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0;
......@@ -112,41 +112,41 @@ void recyclePageTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
releaseResBufPage(pResultBuf, pBufPage);
releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
releaseBufPage(pResultBuf, t4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t5 = getResBufPage(pResultBuf, pageId);
tFilePage* t5 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5);
// flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
*(int32_t*)(pBufPagex->data) = nx;
writePageId = pageId; // update the data
releaseResBufPage(pResultBuf, pBufPagex);
releaseBufPage(pResultBuf, pBufPagex);
tFilePage* pBufPagex1 = getResBufPage(pResultBuf, 1);
tFilePage* pBufPagex1 = getBufPage(pResultBuf, 1);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6);
......
......@@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <string.h>
#include <assert.h>
#include <time.h>
#include "taos.h"
static int running = 1;
static void msg_process(tmq_message_t* message) {
tmqShowMsg(message);
}
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
int32_t init_env() {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
......@@ -44,29 +42,28 @@ int32_t init_env() {
}
taos_free_result(pRes);
/*pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
/*pRes = taos_query(pConn, "create table if not exists tu using st1 tags(1)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create child table tu, reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
/*pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");*/
/*if (taos_errno(pRes) != 0) {*/
/*printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));*/
/*return -1;*/
/*}*/
/*taos_free_result(pRes);*/
const char* sql = "select * from st1";
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu1 using st1 tags(1)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)");
if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1;
}
taos_free_result(pRes);
const char* sql = "select * from tu1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
......@@ -91,11 +88,6 @@ tmq_t* build_consumer() {
tmq_conf_set(conf, "group.id", "tg2");
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
return tmq;
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
return NULL;
}
tmq_list_t* build_topic_list() {
......@@ -104,8 +96,7 @@ tmq_list_t* build_topic_list() {
return topic_list;
}
void basic_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
......@@ -113,12 +104,12 @@ void basic_consume_loop(tmq_t *tmq,
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
/*int32_t cnt = 0;*/
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
cnt++;
/*cnt++;*/
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
......@@ -135,8 +126,7 @@ void basic_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t *tmq,
tmq_list_t *topics) {
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
......@@ -148,13 +138,12 @@ void sync_consume_loop(tmq_t *tmq,
}
while (running) {
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500);
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0)
tmq_commit(tmq, NULL, 0);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
}
......@@ -165,11 +154,48 @@ void sync_consume_loop(tmq_t *tmq,
fprintf(stderr, "%% Consumer closed\n");
}
int main() {
void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t batchCnt = 0;
int32_t skipLogNum = 0;
clock_t startTime = clock();
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
batchCnt++;
skipLogNum += tmqGetSkipLogNum(tmqmessage);
/*msg_process(tmqmessage);*/
tmq_message_destroy(tmqmessage);
} else {
break;
}
}
clock_t endTime = clock();
printf("log batch cnt: %d, skip log cnt: %d, time used:%f s\n", batchCnt, skipLogNum,
(double)(endTime - startTime) / CLOCKS_PER_SEC);
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
int code;
if (argc > 1) {
printf("env init\n");
code = init_env();
}
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
/*perf_loop(tmq, topic_list);*/
basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/
}
......@@ -16,8 +16,8 @@
#ifndef TDENGINE_TAOS_H
#define TDENGINE_TAOS_H
#include <stdint.h>
#include <stdbool.h>
#include <stdint.h>
#ifdef __cplusplus
extern "C" {
......@@ -85,22 +85,22 @@ typedef struct taosField {
} TAOS_FIELD;
#ifdef _TD_GO_DLL_
#define DLL_EXPORT __declspec(dllexport)
#define DLL_EXPORT __declspec(dllexport)
#else
#define DLL_EXPORT
#define DLL_EXPORT
#endif
typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code);
typedef struct TAOS_BIND {
int buffer_type;
void * buffer;
void *buffer;
uintptr_t buffer_length; // unused
uintptr_t *length;
int * is_null;
int *is_null;
int is_unsigned; // unused
int * error; // unused
int *error; // unused
union {
int64_t ts;
int8_t b;
......@@ -128,7 +128,8 @@ typedef struct TAOS_MULTI_BIND {
DLL_EXPORT void taos_cleanup(void);
DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen,
const char *db, int dbLen, uint16_t port);
DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port);
DLL_EXPORT void taos_close(TAOS *taos);
......@@ -136,21 +137,21 @@ const char *taos_data_type(int type);
DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes);
DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, int colIdx);
DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind);
DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx);
DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES * taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt);
DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt);
DLL_EXPORT char * taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt);
DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql);
DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen);
......@@ -170,7 +171,7 @@ DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col);
DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows);
DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql);
DLL_EXPORT int* taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res);
DLL_EXPORT const char *taos_get_server_info(TAOS *taos);
DLL_EXPORT const char *taos_get_client_info();
......@@ -181,8 +182,9 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param);
DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param);
typedef void (*__taos_sub_fn_t)(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS* taos, int restart, const char* topic, const char *sql, __taos_sub_fn_t fp, void *param, int interval);
typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code);
DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp,
void *param, int interval);
DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub);
DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress);
......@@ -190,8 +192,8 @@ DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList);
DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision);
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
/* --------------------------TMQ INTERFACE------------------------------- */
......@@ -217,8 +219,8 @@ DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *);
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t* tmq_message);
DLL_EXPORT const char* tmq_err2str(tmq_resp_err_t);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list);
......@@ -227,7 +229,7 @@ DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
......@@ -251,8 +253,9 @@ DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
//temporary used function for demo only
void tmqShowMsg(tmq_message_t* tmq_message);
// temporary used function for demo only
void tmqShowMsg(tmq_message_t *tmq_message);
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
#ifdef __cplusplus
}
......
......@@ -16,27 +16,32 @@
#ifndef TDENGINE_COMMON_H
#define TDENGINE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
#include "tmsg.h"
#include "tarray.h"
#include "tmsg.h"
#include "tvariant.h"
//typedef struct STimeWindow {
// typedef struct STimeWindow {
// TSKEY skey;
// TSKEY ekey;
//} STimeWindow;
// } STimeWindow;
//typedef struct {
// typedef struct {
// int32_t dataLen;
// char name[TSDB_TABLE_FNAME_LEN];
// char *data;
//} STagData;
// } STagData;
//typedef struct SSchema {
// typedef struct SSchema {
// uint8_t type;
// char name[TSDB_COL_NAME_LEN];
// int16_t colId;
// int16_t bytes;
//} SSchema;
// } SSchema;
#define TMQ_REQ_TYPE_COMMIT_ONLY 0
#define TMQ_REQ_TYPE_CONSUME_ONLY 1
......@@ -80,12 +85,22 @@ typedef struct SSDataBlock {
SDataBlockInfo info;
} SSDataBlock;
typedef struct SVarColAttr {
int32_t *offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data
uint32_t allocLen; // allocated buffer size
} SVarColAttr;
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData {
SColumnInfo info; // TODO filter info needs to be removed
char *nullbitmap;//
bool hasNull;// if current column data has null value.
char *pData; // the corresponding block data in memory
union {
char *nullbitmap; // bitmap, one bit for each item in the list
SVarColAttr varmeta;
};
} SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
......@@ -110,7 +125,7 @@ static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlo
return tlen;
}
static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) {
int32_t sz;
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
......@@ -127,7 +142,7 @@ static FORCE_INLINE void* tDecodeDataBlock(void* buf, SSDataBlock* pBlock) {
buf = taosDecodeBinary(buf, (void**)&data.pData, colSz);
taosArrayPush(pBlock->pDataBlock, &data);
}
return buf;
return (void*)buf;
}
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
......@@ -146,7 +161,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
}
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pBlock = (SSDataBlock*) taosArrayGet(pRsp->pBlockData, i);
SSDataBlock* pBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
tlen += tEncodeDataBlock(buf, pBlock);
}
return tlen;
......@@ -179,19 +194,18 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
return;
}
//int32_t numOfOutput = pBlock->info.numOfCols;
// int32_t numOfOutput = pBlock->info.numOfCols;
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
for(int32_t i = 0; i < sz; ++i) {
for (int32_t i = 0; i < sz; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
//tfree(pBlock);
// tfree(pBlock);
}
static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
if (pRsp->schemas) {
if (pRsp->schemas->nCols) {
......@@ -199,11 +213,11 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) {
}
free(pRsp->schemas);
}
taosArrayDestroyEx(pRsp->pBlockData, (void(*)(void*))tDeleteSSDataBlock);
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
pRsp->pBlockData = NULL;
//for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
//SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
//tDeleteSSDataBlock(pDataBlock);
// for (int i = 0; i < taosArrayGetSize(pRsp->pBlockData); i++) {
// SSDataBlock* pDataBlock = (SSDataBlock*)taosArrayGet(pRsp->pBlockData, i);
// tDeleteSSDataBlock(pDataBlock);
//}
}
......@@ -245,7 +259,7 @@ typedef struct SSqlExpr {
typedef struct SExprInfo {
struct SSqlExpr base;
struct tExprNode *pExpr;
struct tExprNode* pExpr;
} SExprInfo;
typedef struct SStateWindow {
......@@ -262,4 +276,8 @@ typedef struct SSessionWindow {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_COMMON_H
......@@ -23,7 +23,7 @@ extern "C" {
#include "taos.h"
#include "tdef.h"
typedef uint64_t tb_uid_t;
typedef int64_t tb_uid_t;
#define TSWINDOW_INITIALIZER ((STimeWindow){INT64_MIN, INT64_MAX})
#define TSWINDOW_DESC_INITIALIZER ((STimeWindow){INT64_MAX, INT64_MIN})
......
......@@ -7,12 +7,22 @@ extern "C" {
#include "os.h"
#include "tmsg.h"
#include "common.h"
typedef struct SCorEpSet {
int32_t version;
SEpSet epSet;
} SCorEpSet;
typedef struct SBlockOrderInfo {
int32_t order;
int32_t colIndex;
SColumnInfoData *pColData;
// int32_t type;
// int32_t bytes;
// bool hasNull;
} SBlockOrderInfo;
int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port);
......@@ -21,6 +31,77 @@ bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
SEpSet getEpSet_s(SCorEpSet *pEpSet);
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
#define colDataSetNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) {
if (!pColumnInfoData->hasNull) {
return false;
}
if (pColAgg != NULL) {
if (pColAgg->numOfNull == totalRows) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return true;
} else if (pColAgg->numOfNull == 0) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return false;
}
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.offset[row] == -1;
} else {
if (pColumnInfoData->nullbitmap == NULL) {
return false;
}
return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
}
}
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes));
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t colDataGetNumOfCols(const SSDataBlock* pBlock);
size_t colDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
void *blockDataDestroy(SSDataBlock *pBlock);
#ifdef __cplusplus
}
#endif
......
此差异已折叠。
......@@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// clang-format off
#if 0
#undef TD_MSG_INFO_
#undef TD_MSG_NUMBER_
......@@ -82,7 +84,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_DND_CREATE_VNODE, "dnode-create-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_ALTER_VNODE, "dnode-alter-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_DROP_VNODE, "dnode-drop-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_AUTH_VNODE, "dnode-auth-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_SYNC_VNODE, "dnode-sync-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_COMPACT_VNODE, "dnode-compact-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_DND_CONFIG_DNODE, "dnode-config-dnode", NULL, NULL)
......@@ -137,12 +138,12 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SCMCreateTopicReq, SCMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMqTmrMsg, SMqTmrMsg)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-timer", SMTimerReq, SMTimerReq)
TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg)
// Requests handled by VNODE
......@@ -191,3 +192,4 @@ enum {
TDMT_MAX
#endif
};
// clang-format on
......@@ -105,7 +105,6 @@ typedef struct SFuncExecEnv {
int32_t calcMemSize;
} SFuncExecEnv;
typedef void* FuncMgtHandle;
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx);
......@@ -127,9 +126,7 @@ typedef struct SScalarFuncExecFuncs {
int32_t fmFuncMgtInit();
int32_t fmGetHandle(FuncMgtHandle* pHandle);
int32_t fmGetFuncInfo(FuncMgtHandle handle, const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType);
int32_t fmGetFuncInfo(const char* pFuncName, int32_t* pFuncId, int32_t* pFuncType);
int32_t fmGetFuncResultType(SFunctionNode* pFunc);
......
......@@ -49,7 +49,6 @@ typedef enum ENodeType {
QUERY_NODE_VALUE,
QUERY_NODE_OPERATOR,
QUERY_NODE_LOGIC_CONDITION,
QUERY_NODE_IS_NULL_CONDITION,
QUERY_NODE_FUNCTION,
QUERY_NODE_REAL_TABLE,
QUERY_NODE_TEMP_TABLE,
......@@ -62,6 +61,7 @@ typedef enum ENodeType {
QUERY_NODE_INTERVAL_WINDOW,
QUERY_NODE_NODE_LIST,
QUERY_NODE_FILL,
QUERY_NODE_COLUMN_REF,
// Only be used in parser module.
QUERY_NODE_RAW_EXPR,
......@@ -69,7 +69,11 @@ typedef enum ENodeType {
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR,
QUERY_NODE_SELECT_STMT,
QUERY_NODE_SHOW_STMT
QUERY_NODE_SHOW_STMT,
QUERY_NODE_LOGIC_PLAN_SCAN,
QUERY_NODE_LOGIC_PLAN_FILTER,
QUERY_NODE_LOGIC_PLAN_AGG
} ENodeType;
/**
......@@ -122,7 +126,8 @@ void nodesRewriteListPostOrder(SNodeList* pList, FNodeRewriter rewriter, void* p
bool nodesEqualNode(const SNode* a, const SNode* b);
void nodesCloneNode(const SNode* pNode);
SNode* nodesCloneNode(const SNode* pNode);
SNodeList* nodesCloneList(const SNodeList* pList);
int32_t nodesNodeToString(const SNode* pNode, char** pStr, int32_t* pLen);
int32_t nodesStringToNode(const char* pStr, SNode** pNode);
......
......@@ -37,7 +37,7 @@ typedef struct SDataType {
} SDataType;
typedef struct SExprNode {
ENodeType nodeType;
ENodeType type;
SDataType resType;
char aliasName[TSDB_COL_NAME_LEN];
SNodeList* pAssociationList;
......@@ -59,6 +59,11 @@ typedef struct SColumnNode {
SNode* pProjectRef;
} SColumnNode;
typedef struct SColumnRef {
ENodeType type;
int32_t slotId;
} SColumnRef;
typedef struct SValueNode {
SExprNode node; // QUERY_NODE_VALUE
char* literal;
......@@ -80,6 +85,10 @@ typedef enum EOperatorType {
OP_TYPE_DIV,
OP_TYPE_MOD,
// bit operator
OP_TYPE_BIT_AND,
OP_TYPE_BIT_OR,
// comparison operator
OP_TYPE_GREATER_THAN,
OP_TYPE_GREATER_EQUAL,
......@@ -93,8 +102,8 @@ typedef enum EOperatorType {
OP_TYPE_NOT_LIKE,
OP_TYPE_MATCH,
OP_TYPE_NMATCH,
OP_TYPE_ISNULL,
OP_TYPE_NOTNULL,
OP_TYPE_IS_NULL,
OP_TYPE_IS_NOT_NULL,
OP_TYPE_BIT_AND,
OP_TYPE_BIT_OR,
......@@ -117,17 +126,11 @@ typedef enum ELogicConditionType {
} ELogicConditionType;
typedef struct SLogicConditionNode {
ENodeType type; // QUERY_NODE_LOGIC_CONDITION
SExprNode node; // QUERY_NODE_LOGIC_CONDITION
ELogicConditionType condType;
SNodeList* pParameterList;
} SLogicConditionNode;
typedef struct SIsNullCondNode {
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
SNode* pExpr;
bool isNull;
} SIsNullCondNode;
typedef struct SNodeListNode {
ENodeType type; // QUERY_NODE_NODE_LIST
SNodeList* pNodeList;
......@@ -142,7 +145,7 @@ typedef struct SFunctionNode {
} SFunctionNode;
typedef struct STableNode {
ENodeType type;
SExprNode node;
char dbName[TSDB_DB_NAME_LEN];
char tableName[TSDB_TABLE_NAME_LEN];
char tableAlias[TSDB_TABLE_NAME_LEN];
......
......@@ -135,7 +135,7 @@ typedef struct SVgDataBlocks {
SVgroupInfo vg;
int32_t numOfTables; // number of tables in current submit block
uint32_t size;
char *pData; // SMsgDesc + SSubmitMsg + SSubmitBlk + ...
char *pData; // SMsgDesc + SSubmitReq + SSubmitBlk + ...
} SVgDataBlocks;
typedef struct SVnodeModifOpStmtInfo {
......
......@@ -24,7 +24,7 @@
#endif
OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan)
......
......@@ -459,6 +459,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260A) //Not a GROUP BY expression
#define TSDB_CODE_PAR_NOT_SELECTED_EXPRESSION TAOS_DEF_ERROR_CODE(0, 0x260B) //Not SELECTed expression
#define TSDB_CODE_PAR_NOT_SINGLE_GROUP TAOS_DEF_ERROR_CODE(0, 0x260C) //Not a single-group group function
#define TSDB_CODE_PAR_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x260D) //Out of memory
#ifdef __cplusplus
}
......
......@@ -37,7 +37,7 @@ static FORCE_INLINE int taosEncodeFixedU8(void **buf, uint8_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU8(void *buf, uint8_t *value) {
static FORCE_INLINE void *taosDecodeFixedU8(const void *buf, uint8_t *value) {
*value = ((uint8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
......@@ -51,7 +51,7 @@ static FORCE_INLINE int taosEncodeFixedI8(void **buf, int8_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedI8(void *buf, int8_t *value) {
static FORCE_INLINE void *taosDecodeFixedI8(const void *buf, int8_t *value) {
*value = ((int8_t *)buf)[0];
return POINTER_SHIFT(buf, sizeof(*value));
}
......@@ -71,7 +71,7 @@ static FORCE_INLINE int taosEncodeFixedU16(void **buf, uint16_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU16(void *buf, uint16_t *value) {
static FORCE_INLINE void *taosDecodeFixedU16(const void *buf, uint16_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
......@@ -87,9 +87,9 @@ static FORCE_INLINE int taosEncodeFixedI16(void **buf, int16_t value) {
return taosEncodeFixedU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI16(void *buf, int16_t *value) {
static FORCE_INLINE void *taosDecodeFixedI16(const void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeFixedU16(buf, &tvalue);
void *ret = taosDecodeFixedU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
......@@ -111,7 +111,7 @@ static FORCE_INLINE int taosEncodeFixedU32(void **buf, uint32_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU32(void *buf, uint32_t *value) {
static FORCE_INLINE void *taosDecodeFixedU32(const void *buf, uint32_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
......@@ -129,9 +129,9 @@ static FORCE_INLINE int taosEncodeFixedI32(void **buf, int32_t value) {
return taosEncodeFixedU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI32(void *buf, int32_t *value) {
static FORCE_INLINE void *taosDecodeFixedI32(const void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeFixedU32(buf, &tvalue);
void *ret = taosDecodeFixedU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
......@@ -158,7 +158,7 @@ static FORCE_INLINE int taosEncodeFixedU64(void **buf, uint64_t value) {
return (int)sizeof(value);
}
static FORCE_INLINE void *taosDecodeFixedU64(void *buf, uint64_t *value) {
static FORCE_INLINE void *taosDecodeFixedU64(const void *buf, uint64_t *value) {
if (IS_LITTLE_ENDIAN()) {
memcpy(value, buf, sizeof(*value));
} else {
......@@ -180,9 +180,9 @@ static FORCE_INLINE int taosEncodeFixedI64(void **buf, int64_t value) {
return taosEncodeFixedU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosDecodeFixedI64(void *buf, int64_t *value) {
static FORCE_INLINE void *taosDecodeFixedI64(const void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeFixedU64(buf, &tvalue);
void *ret = taosDecodeFixedU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
......@@ -205,7 +205,7 @@ static FORCE_INLINE int taosEncodeVariantU16(void **buf, uint16_t value) {
return i + 1;
}
static FORCE_INLINE void *taosDecodeVariantU16(void *buf, uint16_t *value) {
static FORCE_INLINE void *taosDecodeVariantU16(const void *buf, uint16_t *value) {
int i = 0;
uint16_t tval = 0;
*value = 0;
......@@ -228,9 +228,9 @@ static FORCE_INLINE int taosEncodeVariantI16(void **buf, int16_t value) {
return taosEncodeVariantU16(buf, ZIGZAGE(int16_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI16(void *buf, int16_t *value) {
static FORCE_INLINE void *taosDecodeVariantI16(const void *buf, int16_t *value) {
uint16_t tvalue = 0;
void * ret = taosDecodeVariantU16(buf, &tvalue);
void *ret = taosDecodeVariantU16(buf, &tvalue);
*value = ZIGZAGD(int16_t, tvalue);
return ret;
}
......@@ -253,7 +253,7 @@ static FORCE_INLINE int taosEncodeVariantU32(void **buf, uint32_t value) {
return i + 1;
}
static FORCE_INLINE void *taosDecodeVariantU32(void *buf, uint32_t *value) {
static FORCE_INLINE void *taosDecodeVariantU32(const void *buf, uint32_t *value) {
int i = 0;
uint32_t tval = 0;
*value = 0;
......@@ -276,9 +276,9 @@ static FORCE_INLINE int taosEncodeVariantI32(void **buf, int32_t value) {
return taosEncodeVariantU32(buf, ZIGZAGE(int32_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI32(void *buf, int32_t *value) {
static FORCE_INLINE void *taosDecodeVariantI32(const void *buf, int32_t *value) {
uint32_t tvalue = 0;
void * ret = taosDecodeVariantU32(buf, &tvalue);
void *ret = taosDecodeVariantU32(buf, &tvalue);
*value = ZIGZAGD(int32_t, tvalue);
return ret;
}
......@@ -301,7 +301,7 @@ static FORCE_INLINE int taosEncodeVariantU64(void **buf, uint64_t value) {
return i + 1;
}
static FORCE_INLINE void *taosDecodeVariantU64(void *buf, uint64_t *value) {
static FORCE_INLINE void *taosDecodeVariantU64(const void *buf, uint64_t *value) {
int i = 0;
uint64_t tval = 0;
*value = 0;
......@@ -324,9 +324,9 @@ static FORCE_INLINE int taosEncodeVariantI64(void **buf, int64_t value) {
return taosEncodeVariantU64(buf, ZIGZAGE(int64_t, value));
}
static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) {
static FORCE_INLINE void *taosDecodeVariantI64(const void *buf, int64_t *value) {
uint64_t tvalue = 0;
void * ret = taosDecodeVariantU64(buf, &tvalue);
void *ret = taosDecodeVariantU64(buf, &tvalue);
*value = ZIGZAGD(int64_t, tvalue);
return ret;
}
......@@ -346,7 +346,7 @@ static FORCE_INLINE int taosEncodeString(void **buf, const char *value) {
return tlen;
}
static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
static FORCE_INLINE void *taosDecodeString(const void *buf, char **value) {
uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size);
......@@ -360,7 +360,7 @@ static FORCE_INLINE void *taosDecodeString(void *buf, char **value) {
return POINTER_SHIFT(buf, size);
}
static FORCE_INLINE void *taosDecodeStringTo(void *buf, char *value) {
static FORCE_INLINE void *taosDecodeStringTo(const void *buf, char *value) {
uint64_t size = 0;
buf = taosDecodeVariantU64(buf, &size);
......@@ -384,8 +384,7 @@ static FORCE_INLINE int taosEncodeBinary(void **buf, const void *value, int32_t
return tlen;
}
static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valueLen) {
static FORCE_INLINE void *taosDecodeBinary(const void *buf, void **value, int32_t valueLen) {
*value = malloc((size_t)valueLen);
if (*value == NULL) return NULL;
memcpy(*value, buf, (size_t)valueLen);
......@@ -393,8 +392,7 @@ static FORCE_INLINE void *taosDecodeBinary(void *buf, void **value, int32_t valu
return POINTER_SHIFT(buf, valueLen);
}
static FORCE_INLINE void *taosDecodeBinaryTo(void *buf, void *value, int32_t valueLen) {
static FORCE_INLINE void *taosDecodeBinaryTo(const void *buf, void *value, int32_t valueLen) {
memcpy(value, buf, (size_t)valueLen);
return POINTER_SHIFT(buf, valueLen);
}
......
......@@ -22,28 +22,31 @@ extern "C" {
typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param);
typedef struct SLoserTreeNode {
typedef struct STreeNode {
int32_t index;
void *pData;
} SLoserTreeNode;
void *pData; // TODO remove it?
} STreeNode;
typedef struct SLoserTreeInfo {
int32_t numOfEntries;
int32_t totalEntries;
typedef struct SMultiwayMergeTreeInfo {
int32_t numOfSources;
int32_t totalSources;
__merge_compare_fn_t comparFn;
void * param;
SLoserTreeNode *pNode;
} SLoserTreeInfo;
struct STreeNode *pNode;
} SMultiwayMergeTreeInfo;
uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
#define tMergeTreeGetChosenIndex(t_) ((t_)->pNode[0].index)
#define tMergeTreeGetAdjustIndex(t_) (tMergeTreeGetChosenIndex(t_) + (t_)->numOfSources)
void tLoserTreeInit(SLoserTreeInfo *pTree);
int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tLoserTreeAdjust(SLoserTreeInfo *pTree, int32_t idx);
void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree);
void tLoserTreeRebuild(SLoserTreeInfo *pTree);
void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
void tLoserTreeDisplay(SLoserTreeInfo *pTree);
void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
void tMergeTreePrint(const SMultiwayMergeTreeInfo *pTree);
#ifdef __cplusplus
}
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TPAGEDFILE_H
#define TDENGINE_TPAGEDFILE_H
#ifndef TDENGINE_TPAGEDBUF_H
#define TDENGINE_TPAGEDBUF_H
#ifdef __cplusplus
extern "C" {
......@@ -26,57 +26,10 @@ extern "C" {
#include "tlockfree.h"
typedef struct SArray* SIDList;
typedef struct SPageDiskInfo {
int32_t offset;
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
SListNode* pn; // point to list node
int32_t pageId;
SPageDiskInfo info;
void* pData;
bool used; // set current page is in used
} SPageInfo;
typedef struct SFreeListItem {
int32_t offset;
int32_t len;
} SFreeListItem;
typedef struct SResultBufStatis {
int32_t flushBytes;
int32_t loadBytes;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
int32_t numOfPages;
int64_t totalBufSize;
int64_t fileSize; // disk file size
FILE* file;
int32_t allocateId; // allocated page id
char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* groupSet; // id hash table
SHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
uint64_t qId; // for debug purpose
SResultBufStatis statis;
} SDiskbasedResultBuf;
typedef struct SPageInfo SPageInfo;
typedef struct SDiskbasedBuf SDiskbasedBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define DEFAULT_PAGE_SIZE (16384L)
typedef struct SFilePage {
......@@ -84,76 +37,84 @@ typedef struct SFilePage {
char data[];
} SFilePage;
typedef struct SDiskbasedBufStatis {
int64_t flushBytes;
int64_t loadBytes;
int32_t loadPages;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SDiskbasedBufStatis;
/**
* create disk-based result buffer
* @param pResultBuf
* @param pBuf
* @param rowSize
* @param pagesize
* @param inMemPages
* @param handle
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
/**
*
* @param pResultBuf
* @param pBuf
* @param groupId
* @param pageId
* @return
*/
SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @param pBuf
* @param groupId
* @return
*/
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId);
/**
* get the specified buffer page by id
* @param pResultBuf
* @param pBuf
* @param id
* @return
*/
SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id);
/**
* release the referenced buf pages
* @param pResultBuf
* @param pBuf
* @param page
*/
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page);
void releaseBufPage(SDiskbasedBuf* pBuf, void* page);
/**
*
* @param pResultBuf
* @param pBuf
* @param pi
*/
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi);
void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
* @param pBuf
* @return
*/
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf);
size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
/**
* get the number of groups in the result buffer
* @param pResultBuf
* @param pBuf
* @return
*/
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf);
size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf);
/**
* destroy result buffer
* @param pResultBuf
* @param pBuf
*/
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
void destroyResultBuf(SDiskbasedBuf* pBuf);
/**
*
......@@ -162,8 +123,49 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
*/
SPageInfo* getLastPageInfo(SIDList pList);
/**
*
* @param pPgInfo
* @return
*/
int32_t getPageId(const SPageInfo* pPgInfo);
/**
* Return the buffer page size.
* @param pBuf
* @return
*/
int32_t getBufPageSize(const SDiskbasedBuf* pBuf);
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf);
/**
*
* @param pBuf
* @return
*/
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf);
/**
* Set the buffer page is dirty, and needs to be flushed to disk when swap out.
* @param pPageInfo
* @param dirty
*/
void setBufPageDirty(SFilePage* pPageInfo, bool dirty);
/**
* Print the statistics when closing this buffer
* @param pBuf
*/
void printStatisBeforeClose(SDiskbasedBuf* pBuf);
/**
* return buf statistics.
*/
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TPAGEDFILE_H
#endif // TDENGINE_TPAGEDBUF_H
......@@ -4,4 +4,3 @@ add_subdirectory(common)
add_subdirectory(libs)
add_subdirectory(client)
add_subdirectory(dnode)
\ No newline at end of file
add_subdirectory(nodes)
\ No newline at end of file
......@@ -28,16 +28,17 @@ static int32_t hbMqHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp)
}
static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0;
while (msgLen < valueLen) {
SUseDbRsp *rsp = (SUseDbRsp *)((char *)value + msgLen);
rsp->vgVersion = ntohl(rsp->vgVersion);
rsp->vgNum = ntohl(rsp->vgNum);
rsp->uid = be64toh(rsp->uid);
SUseDbBatchRsp batchUseRsp = {0};
if (tDeserializeSUseDbBatchRsp(value, valueLen, &batchUseRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t numOfBatchs = taosArrayGetSize(batchUseRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
SUseDbRsp *rsp = taosArrayGet(batchUseRsp.pArray, i);
tscDebug("hb db rsp, db:%s, vgVersion:%d, uid:%"PRIx64, rsp->db, rsp->vgVersion, rsp->uid);
if (rsp->vgVersion < 0) {
......@@ -52,16 +53,9 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < rsp->vgNum; ++i) {
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
}
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
for (int32_t j = 0; j < rsp->vgNum; ++j) {
SVgroupInfo *pInfo = taosArrayGet(rsp->pVgroupInfos, j);
if (taosHashPut(vgInfo.vgHash, &pInfo->vgId, sizeof(int32_t), pInfo, sizeof(SVgroupInfo)) != 0) {
tscError("hash push failed, errno:%d", errno);
taosHashCleanup(vgInfo.vgHash);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -74,66 +68,44 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
if (code) {
return code;
}
msgLen += sizeof(SUseDbRsp) + rsp->vgNum * sizeof(SVgroupInfo);
}
tFreeSUseDbBatchRsp(&batchUseRsp);
return TSDB_CODE_SUCCESS;
}
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0;
int32_t schemaNum = 0;
while (msgLen < valueLen) {
STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen);
STableMetaBatchRsp batchMetaRsp = {0};
if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
rsp->numOfColumns = ntohl(rsp->numOfColumns);
rsp->suid = be64toh(rsp->suid);
rsp->dbId = be64toh(rsp->dbId);
int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i);
if (rsp->numOfColumns < 0) {
schemaNum = 0;
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
} else {
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
rsp->numOfTags = ntohl(rsp->numOfTags);
rsp->sversion = ntohl(rsp->sversion);
rsp->tversion = ntohl(rsp->tversion);
rsp->tuid = be64toh(rsp->tuid);
rsp->vgId = ntohl(rsp->vgId);
SSchema* pSchema = rsp->pSchema;
schemaNum = rsp->numOfColumns + rsp->numOfTags;
for (int i = 0; i < schemaNum; ++i) {
pSchema->bytes = ntohl(pSchema->bytes);
pSchema->colId = ntohl(pSchema->colId);
pSchema++;
}
if (rsp->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchema[0].colId);
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
tFreeSTableMetaBatchRsp(&batchMetaRsp);
return TSDB_CODE_TSC_INVALID_VALUE;
}
catalogUpdateSTableMeta(pCatalog, rsp);
}
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
}
tFreeSTableMetaBatchRsp(&batchMetaRsp);
return TSDB_CODE_SUCCESS;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
......@@ -199,9 +171,10 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
tfree(param);
return -1;
}
char *key = (char *)param;
SClientHbBatchRsp pRsp = {0};
tDeserializeSClientHbBatchRsp(pMsg->pData, &pRsp);
tDeserializeSClientHbBatchRsp(pMsg->pData, pMsg->len, &pRsp);
int32_t rspNum = taosArrayGetSize(pRsp.rsps);
......@@ -342,7 +315,7 @@ void hbFreeReq(void *req) {
SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
SClientHbBatchReq* pBatchReq = malloc(sizeof(SClientHbBatchReq));
SClientHbBatchReq* pBatchReq = calloc(1, sizeof(SClientHbBatchReq));
if (pBatchReq == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
......@@ -414,7 +387,7 @@ static void* hbThreadFunc(void* param) {
if (pReq == NULL) {
continue;
}
int tlen = tSerializeSClientHbBatchReq(NULL, pReq);
int tlen = tSerializeSClientHbBatchReq(NULL, 0, pReq);
void *buf = malloc(tlen);
if (buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -422,8 +395,7 @@ static void* hbThreadFunc(void* param) {
hbClearReqInfo(pAppHbMgr);
break;
}
void *abuf = buf;
tSerializeSClientHbBatchReq(&abuf, pReq);
tSerializeSClientHbBatchReq(buf, tlen, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......
......@@ -9,7 +9,7 @@
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedfile.h"
#include "tpagedbuf.h"
#include "tref.h"
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
......@@ -357,40 +357,38 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t
return pTscObj;
}
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->msgInfo.len = sizeof(SConnectReq);
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest;
SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
if (pConnect == NULL) {
tfree(pMsgSendInfo);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
STscObj *pObj = pRequest->pTscObj;
SConnectReq connectReq = {0};
STscObj* pObj = pRequest->pTscObj;
char* db = getDbOfConnection(pObj);
if (db != NULL) {
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
tstrncpy(connectReq.db, db, sizeof(connectReq.db));
}
tfree(db);
pConnect->pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = malloc(contLen);
tSerializeSConnectReq(pReq, contLen, &connectReq);
pMsgSendInfo->msgInfo.pData = pConnect;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgInfo.pData = pReq;
return pMsgSendInfo;
}
......
......@@ -20,14 +20,14 @@
#include "clientLog.h"
#include "catalog.h"
int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
static void setErrno(SRequestObj* pRequest, int32_t code) {
pRequest->code = code;
terrno = code;
}
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
setErrno(pRequest, code);
......@@ -36,7 +36,7 @@ int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
free(pMsg->pData);
......@@ -45,41 +45,35 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
STscObj *pTscObj = pRequest->pTscObj;
STscObj* pTscObj = pRequest->pTscObj;
SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData;
pConnect->acctId = htonl(pConnect->acctId);
pConnect->connId = htonl(pConnect->connId);
pConnect->clusterId = htobe64(pConnect->clusterId);
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
assert(connectRsp.epSet.numOfEps > 0);
assert(pConnect->epSet.numOfEps > 0);
for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port);
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
}
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
}
for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.eps[i].fqdn,
pConnect->epSet.eps[i].port, pTscObj->id);
}
pTscObj->connId = pConnect->connId;
pTscObj->acctId = pConnect->acctId;
tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver));
pTscObj->connId = connectRsp.connId;
pTscObj->acctId = connectRsp.acctId;
tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
// update the appInstInfo
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pTscObj->connType = HEARTBEAT_TYPE_QUERY;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
pTscObj->pAppInfo->numOfConns);
free(pMsg->pData);
......@@ -97,14 +91,15 @@ SMsgSendInfo* buildMsgInfoImpl(SRequestObj *pRequest) {
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE || pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) {
if (pRequest->type == TDMT_MND_SHOW_RETRIEVE) {
SRetrieveTableReq* pRetrieveMsg = calloc(1, sizeof(SRetrieveTableReq));
if (pRetrieveMsg == NULL) {
return NULL;
}
SRetrieveTableReq retrieveReq = {0};
retrieveReq.showId = pRequest->body.showInfo.execId;
pRetrieveMsg->showId = htobe64(pRequest->body.showInfo.execId);
pMsgSendInfo->msgInfo.pData = pRetrieveMsg;
pMsgSendInfo->msgInfo.len = sizeof(SRetrieveTableReq);
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &retrieveReq);
void* pReq = malloc(contLen);
tSerializeSRetrieveTableReq(pReq, contLen, &retrieveReq);
pMsgSendInfo->msgInfo.pData = pReq;
pMsgSendInfo->msgInfo.len = contLen;
} else {
SVShowTablesFetchReq* pFetchMsg = calloc(1, sizeof(SVShowTablesFetchReq));
if (pFetchMsg == NULL) {
......@@ -134,39 +129,29 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
SShowRsp* pShow = (SShowRsp *)pMsg->pData;
pShow->showId = htobe64(pShow->showId);
STableMetaRsp *pMetaMsg = &(pShow->tableMeta);
pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);
SShowRsp showRsp = {0};
tDeserializeSShowRsp(pMsg->pData, pMsg->len, &showRsp);
STableMetaRsp *pMetaMsg = &showRsp.tableMeta;
SSchema* pSchema = pMetaMsg->pSchema;
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
pSchema++;
}
pSchema = pMetaMsg->pSchema;
tfree(pRequest->body.resInfo.pRspMsg);
pRequest->body.resInfo.pRspMsg = pMsg->pData;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
if (pResInfo->fields == NULL) {
TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
pFields[i].type = pSchema[i].type;
pFields[i].bytes = pSchema[i].bytes;
SSchema* pSchema = &pMetaMsg->pSchemas[i];
tstrncpy(pFields[i].name, pSchema->name, tListLen(pFields[i].name));
pFields[i].type = pSchema->type;
pFields[i].bytes = pSchema->bytes;
}
pResInfo->fields = pFields;
}
pResInfo->numOfCols = pMetaMsg->numOfColumns;
pRequest->body.showInfo.execId = pShow->showId;
pRequest->body.showInfo.execId = showRsp.showId;
tFreeSShowRsp(&showRsp);
// todo
if (pRequest->type == TDMT_VND_SHOW_TABLES) {
......@@ -264,9 +249,13 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
SUseDbRsp* pUseDbRsp = (SUseDbRsp*) pMsg->pData;
SUseDbRsp usedbRsp = {0};
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
SName name = {0};
tNameFromString(&name, pUseDbRsp->db, T_NAME_ACCT|T_NAME_DB);
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB);
tFreeSUsedbRsp(&usedbRsp);
char db[TSDB_DB_NAME_LEN] = {0};
tNameGetDbName(&name, db);
......@@ -300,14 +289,12 @@ int32_t processDropDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
SDropDbRsp *rsp = (SDropDbRsp *)pMsg->pData;
struct SCatalog *pCatalog = NULL;
rsp->uid = be64toh(rsp->uid);
SDropDbRsp dropdbRsp = {0};
tDeserializeSDropDbRsp(pMsg->pData, pMsg->len, &dropdbRsp);
struct SCatalog* pCatalog = NULL;
catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
catalogRemoveDB(pCatalog, rsp->db, rsp->uid);
catalogRemoveDB(pCatalog, dropdbRsp.db, dropdbRsp.uid);
tsem_post(&pRequest->body.rspSem);
return code;
......
......@@ -25,7 +25,7 @@
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedfile.h"
#include "tpagedbuf.h"
#include "tref.h"
struct tmq_list_t {
......@@ -33,6 +33,7 @@ struct tmq_list_t {
int32_t tot;
char* elems[];
};
struct tmq_topic_vgroup_t {
char* topic;
int32_t vgId;
......@@ -48,14 +49,17 @@ struct tmq_topic_vgroup_list_t {
struct tmq_conf_t {
char clientId[256];
char groupId[256];
bool auto_commit;
/*char* ip;*/
/*uint16_t port;*/
tmq_commit_cb* commit_cb;
};
struct tmq_t {
// conf
char groupId[256];
char clientId[256];
bool autoCommit;
SRWLatch lock;
int64_t consumerId;
int32_t epoch;
......@@ -94,25 +98,25 @@ typedef struct SMqClientTopic {
SArray* vgs; // SArray<SMqClientVg>
} SMqClientTopic;
typedef struct SMqSubscribeCbParam {
typedef struct {
tmq_t* tmq;
tsem_t rspSem;
tmq_resp_err_t rspErr;
} SMqSubscribeCbParam;
typedef struct SMqAskEpCbParam {
typedef struct {
tmq_t* tmq;
int32_t wait;
} SMqAskEpCbParam;
typedef struct SMqConsumeCbParam {
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
tmq_message_t** retMsg;
tsem_t rspSem;
} SMqConsumeCbParam;
typedef struct SMqCommitCbParam {
typedef struct {
tmq_t* tmq;
SMqClientVg* pVg;
int32_t async;
......@@ -121,6 +125,7 @@ typedef struct SMqCommitCbParam {
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = calloc(1, sizeof(tmq_conf_t));
conf->auto_commit = false;
return conf;
}
......@@ -131,11 +136,24 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value);
return TMQ_CONF_OK;
}
if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value);
return TMQ_CONF_OK;
}
if (strcmp(key, "enable.auto.commit") == 0) {
if (strcmp(value, "true") == 0) {
conf->auto_commit = true;
return TMQ_CONF_OK;
} else if (strcmp(value, "false") == 0) {
conf->auto_commit = false;
return TMQ_CONF_OK;
} else {
return TMQ_CONF_INVALID;
}
}
return TMQ_CONF_UNKNOWN;
}
tmq_list_t* tmq_list_new() {
......@@ -155,6 +173,12 @@ int32_t tmq_list_append(tmq_list_t* ptr, const char* src) {
return 0;
}
tmq_resp_err_t tmq_reset_offset(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) {
// build msg
// send to mnode
return TMQ_RESP_ERR__SUCCESS;
}
int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param;
pParam->rspErr = code;
......@@ -182,11 +206,14 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
pTmq->pollCnt = 0;
pTmq->epoch = 0;
taosInitRWLatch(&pTmq->lock);
// set conf
strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId);
pTmq->autoCommit = conf->auto_commit;
pTmq->commit_cb = conf->commit_cb;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
pTmq->consumerId = generateRequestId() & (((uint64_t)-1) >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
return pTmq;
}
......@@ -357,25 +384,21 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
tNameFromString(&name, topicName, T_NAME_TABLE);
char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
tNameExtractFullName(&name, topicFname);
SCMCreateTopicReq req = {
.name = (char*)topicFname,
SMCreateTopicReq req = {
.igExists = 1,
.physicalPlan = (char*)pStr,
.sql = (char*)sql,
.logicalPlan = (char*)"no logic plan",
};
tNameExtractFullName(&name, req.name);
int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
int tlen = tSerializeMCreateTopicReq(NULL, 0, &req);
void* buf = malloc(tlen);
if (buf == NULL) {
goto _return;
}
void* abuf = buf;
tSerializeSCMCreateTopicReq(&abuf, &req);
tSerializeMCreateTopicReq(buf, tlen, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
......@@ -452,6 +475,12 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return buf;
}
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0;
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
return pRsp->skipLogNum;
}
void tmqShowMsg(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
......@@ -557,8 +586,15 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
// clang-format off
SMqClientVg clientVg = {
.pollCnt = 0, .committedOffset = -1, .currentOffset = -1, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet};
.pollCnt = 0,
.committedOffset = -1,
.currentOffset = -1,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet
};
// clang-format on
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
......@@ -649,17 +685,19 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == 0);
if (blocking_time < 0) blocking_time = 1;
if (blocking_time <= 0) blocking_time = 1;
if (blocking_time > 1000) blocking_time = 1000;
/*blocking_time = 1;*/
if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
printf("over1\n");
usleep(blocking_time * 1000);
return NULL;
}
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
if (taosArrayGetSize(pTopic->vgs) == 0) {
printf("over2\n");
usleep(blocking_time * 1000);
return NULL;
}
......@@ -670,7 +708,8 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, TMQ_REQ_TYPE_CONSUME_ONLY, pTopic, pVg);
int32_t reqType = tmq->autoCommit ? TMQ_REQ_TYPE_CONSUME_AND_COMMIT : TMQ_REQ_TYPE_CONSUME_ONLY;
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, reqType, pTopic, pVg);
if (pReq == NULL) {
ASSERT(false);
usleep(blocking_time * 1000);
......
此差异已折叠。
此差异已折叠。
#include <common.h>
#include "os.h"
#include "tutil.h"
......@@ -269,3 +270,4 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
......@@ -344,8 +344,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
UNUSED(ret);
}
fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
int32_t ret = fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
ret = fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
// NOTE: mix types tags are not supported
size_t sz = 0;
......
#include <common.h>
#include <gtest/gtest.h>
#include <tep.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
......@@ -96,4 +97,199 @@ TEST(testCase, toInteger_test) {
ASSERT_EQ(ret, -1);
}
TEST(testCase, Datablock_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
infoData.pData = (char*) calloc(40, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (40/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 40;
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
infoData1.info.colId = 2;
infoData1.varmeta.offset = (int32_t*) calloc(40, sizeof(uint32_t));
taosArrayPush(b->pDataBlock, &infoData1);
char* str = "the value of: %d";
char buf[128] = {0};
char varbuf[128] = {0};
for(int32_t i = 0; i < 40; ++i) {
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
if (i&0x01) {
int32_t len = sprintf(buf, str, i);
STR_TO_VARSTR(varbuf, buf)
colDataAppend(p0, i, (const char*) &i, false);
colDataAppend(p1, i, (const char*) varbuf, false);
memset(varbuf, 0, sizeof(varbuf));
memset(buf, 0, sizeof(buf));
} else {
colDataAppend(p0, i, (const char*) &i, true);
colDataAppend(p1, i, (const char*) varbuf, true);
}
b->info.rows++;
}
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
for(int32_t i = 0; i < 40; ++i) {
if (i & 0x01) {
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), false);
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), false);
} else {
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), true);
ASSERT_EQ(colDataIsNull(p0, b->info.rows, i, nullptr), true);
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), true);
}
}
printf("binary column length:%d\n", *(int32_t*) p1->pData);
ASSERT_EQ(colDataGetNumOfCols(b), 2);
ASSERT_EQ(colDataGetNumOfRows(b), 40);
char* pData = colDataGet(p1, 3);
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0};
taosArrayPush(pOrderInfo, &order);
blockDataSort(b, pOrderInfo, true);
blockDataDestroy(b);
taosArrayDestroy(pOrderInfo);
}
#if 0
TEST(testCase, non_var_dataBlock_split_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
int32_t numOfRows = 1000000;
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 1;
infoData1.info.type = TSDB_DATA_TYPE_TINYINT;
infoData1.info.colId = 2;
infoData1.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData1.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData1);
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
int8_t v = i;
colDataAppend(p0, i, (const char*)&i, false);
colDataAppend(p1, i, (const char*)&v, false);
b->info.rows++;
}
int32_t pageSize = 64 * 1024;
int32_t startIndex= 0;
int32_t stopIndex = 0;
int32_t count = 1;
while(1) {
blockDataSplitRows(b, false, startIndex, &stopIndex, pageSize);
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
if (stopIndex == numOfRows - 1) {
break;
}
startIndex = stopIndex + 1;
}
}
#endif
TEST(testCase, var_dataBlock_split_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
int32_t numOfRows = 1000000;
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 40;
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
infoData1.info.colId = 2;
infoData1.varmeta.offset = (int32_t*) calloc(numOfRows, sizeof(uint32_t));
taosArrayPush(b->pDataBlock, &infoData1);
char buf[41] = {0};
char buf1[100] = {0};
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
int8_t v = i;
colDataAppend(p0, i, (const char*)&i, false);
sprintf(buf, "the number of row:%d", i);
int32_t len = sprintf(buf1, buf, i);
STR_TO_VARSTR(buf1, buf)
colDataAppend(p1, i, buf1, false);
b->info.rows++;
memset(buf, 0, sizeof(buf));
memset(buf1, 0, sizeof(buf1));
}
int32_t pageSize = 64 * 1024;
int32_t startIndex= 0;
int32_t stopIndex = 0;
int32_t count = 1;
while(1) {
blockDataSplitRows(b, true, startIndex, &stopIndex, pageSize);
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
if (stopIndex == numOfRows - 1) {
break;
}
startIndex = stopIndex + 1;
}
}
#pragma GCC diagnostic pop
\ No newline at end of file
......@@ -258,11 +258,14 @@ static int32_t dndDropBnode(SDnode *pDnode) {
return 0;
}
int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDCreateBnodeReq *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateBnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to create bnode since %s", terrstr());
return -1;
......@@ -271,11 +274,14 @@ int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDDropBnodeReq *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropBnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_BNODE_INVALID_OPTION;
dError("failed to drop bnode since %s", terrstr());
return -1;
......
......@@ -379,10 +379,9 @@ void dndSendStatusReq(SDnode *pDnode) {
req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
dndGetVnodeLoads(pDnode, req.pVloads);
int32_t contLen = tSerializeSStatusReq(NULL, &req);
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
void *pHead = rpcMallocCont(contLen);
void *pBuf = pHead;
tSerializeSStatusReq(&pBuf, &req);
tSerializeSStatusReq(pHead, contLen, &req);
taosArrayDestroy(req.pVloads);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
......@@ -395,7 +394,7 @@ void dndSendStatusReq(SDnode *pDnode) {
static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) {
SDnodeMgmt *pMgmt = &pDnode->dmgmt;
if (pMgmt->dnodeId == 0) {
dInfo("set dnodeId:%d clusterId:0x%" PRId64, pCfg->dnodeId, pCfg->clusterId);
dInfo("set dnodeId:%d clusterId:%" PRId64, pCfg->dnodeId, pCfg->clusterId);
taosWLockLatch(&pMgmt->latch);
pMgmt->dnodeId = pCfg->dnodeId;
pMgmt->clusterId = pCfg->clusterId;
......@@ -437,7 +436,8 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
}
} else {
SStatusRsp statusRsp = {0};
if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, &statusRsp) != NULL) {
if (pRsp->pCont != NULL && pRsp->contLen != 0 &&
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
pMgmt->dver = statusRsp.dver;
dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps);
......@@ -652,9 +652,6 @@ static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) {
case TDMT_DND_DROP_VNODE:
code = dndProcessDropVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_AUTH_VNODE:
code = dndProcessAuthVnodeReq(pDnode, pMsg);
break;
case TDMT_DND_SYNC_VNODE:
code = dndProcessSyncVnodeReq(pDnode, pMsg);
break;
......
......@@ -424,28 +424,22 @@ static int32_t dndDropMnode(SDnode *pDnode) {
return 0;
}
static SDCreateMnodeReq *dndParseCreateMnodeReq(SRpcMsg *pReq) {
SDCreateMnodeReq *pCreate = pReq->pCont;
pCreate->dnodeId = htonl(pCreate->dnodeId);
for (int32_t i = 0; i < pCreate->replica; ++i) {
pCreate->replicas[i].id = htonl(pCreate->replicas[i].id);
pCreate->replicas[i].port = htons(pCreate->replicas[i].port);
}
return pCreate;
}
int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateMnodeReq *pCreate = dndParseCreateMnodeReq(pReq);
SDCreateMnodeReq createReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pCreate->replica <= 1 || pCreate->dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.replica <= 1 || createReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
}
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromReq(pDnode, &option, pCreate) != 0) {
if (dndBuildMnodeOptionFromReq(pDnode, &option, &createReq) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to create mnode since %s", terrstr());
return -1;
......@@ -464,16 +458,20 @@ int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDAlterMnodeReq *pAlter = dndParseCreateMnodeReq(pReq);
SDAlterMnodeReq alterReq = {0};
if (tDeserializeSDCreateMnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pAlter->dnodeId != dndGetDnodeId(pDnode)) {
if (alterReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr());
return -1;
}
SMnodeOpt option = {0};
if (dndBuildMnodeOptionFromReq(pDnode, &option, pAlter) != 0) {
if (dndBuildMnodeOptionFromReq(pDnode, &option, &alterReq) != 0) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to alter mnode since %s", terrstr());
return -1;
......@@ -494,10 +492,13 @@ int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropMnodeReq *pDrop = pReq->pCont;
pDrop->dnodeId = htonl(pDrop->dnodeId);
SDDropMnodeReq dropReq = {0};
if (tDeserializeSMCreateDropMnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pDrop->dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_MNODE_INVALID_OPTION;
dError("failed to drop mnode since %s", terrstr());
return -1;
......
......@@ -264,11 +264,14 @@ static int32_t dndDropQnode(SDnode *pDnode) {
return 0;
}
int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDCreateQnodeReq *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateQnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
return -1;
......@@ -277,11 +280,14 @@ int32_t dndProcessCreateQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDDropQnodeReq *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropQnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_QNODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
return -1;
......
......@@ -258,11 +258,14 @@ static int32_t dndDropSnode(SDnode *pDnode) {
return 0;
}
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDCreateSnodeReq *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDCreateSnodeReq createReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr());
return -1;
......@@ -271,11 +274,14 @@ int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
}
}
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) {
SDDropSnodeReq *pMsg = pRpcMsg->pCont;
pMsg->dnodeId = htonl(pMsg->dnodeId);
int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDDropSnodeReq dropReq = {0};
if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (pMsg->dnodeId != dndGetDnodeId(pDnode)) {
if (dropReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_SNODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
return -1;
......
......@@ -57,8 +57,6 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_SYNC_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessMgmtMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg;
......@@ -310,24 +308,29 @@ static int32_t dndRetrieveUserAuthInfo(void *parent, char *user, char *spi, char
return -1;
}
SAuthReq *pReq = rpcMallocCont(sizeof(SAuthReq));
tstrncpy(pReq->user, user, TSDB_USER_LEN);
SAuthReq authReq = {0};
tstrncpy(authReq.user, user, TSDB_USER_LEN);
int32_t contLen = tSerializeSAuthReq(NULL, 0, &authReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSAuthReq(pReq, contLen, &authReq);
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = sizeof(SAuthReq), .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
SRpcMsg rpcMsg = {.pCont = pReq, .contLen = contLen, .msgType = TDMT_MND_AUTH, .ahandle = (void *)9528};
SRpcMsg rpcRsp = {0};
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, pReq->spi, pReq->encrypt);
dTrace("user:%s, send user auth req to other mnodes, spi:%d encrypt:%d", user, authReq.spi, authReq.encrypt);
dndSendMsgToMnodeRecv(pDnode, &rpcMsg, &rpcRsp);
if (rpcRsp.code != 0) {
terrno = rpcRsp.code;
dError("user:%s, failed to get user auth from other mnodes since %s", user, terrstr());
} else {
SAuthRsp *pRsp = rpcRsp.pCont;
memcpy(secret, pRsp->secret, TSDB_PASSWORD_LEN);
memcpy(ckey, pRsp->ckey, TSDB_PASSWORD_LEN);
*spi = pRsp->spi;
*encrypt = pRsp->encrypt;
dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, pRsp->spi, pRsp->encrypt);
SAuthRsp authRsp = {0};
tDeserializeSAuthReq(rpcRsp.pCont, rpcRsp.contLen, &authRsp);
memcpy(secret, authRsp.secret, TSDB_PASSWORD_LEN);
memcpy(ckey, authRsp.ckey, TSDB_PASSWORD_LEN);
*spi = authRsp.spi;
*encrypt = authRsp.encrypt;
dTrace("user:%s, success to get user auth from other mnodes, spi:%d encrypt:%d", user, authRsp.spi,
authRsp.encrypt);
}
rpcFreeCont(rpcRsp.pCont);
......
......@@ -15,8 +15,8 @@
#define _DEFAULT_SOURCE
#include "dndVnodes.h"
#include "dndTransport.h"
#include "dndMgmt.h"
#include "dndTransport.h"
typedef struct {
int32_t vgId;
......@@ -34,9 +34,9 @@ typedef struct {
int8_t dropped;
int8_t accessState;
uint64_t dbUid;
char * db;
char * path;
SVnode * pImpl;
char *db;
char *path;
SVnode *pImpl;
STaosQueue *pWriteQ;
STaosQueue *pSyncQ;
STaosQueue *pApplyQ;
......@@ -50,7 +50,7 @@ typedef struct {
int32_t failed;
int32_t threadIndex;
pthread_t thread;
SDnode * pDnode;
SDnode *pDnode;
SWrapperCfg *pCfgs;
} SVnodeThread;
......@@ -68,7 +68,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg);
static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId);
static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode);
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl);
static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode);
......@@ -81,7 +81,7 @@ static void dndCloseVnodes(SDnode *pDnode);
static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj * pVnode = NULL;
SVnodeObj *pVnode = NULL;
int32_t refCount = 0;
taosRLockLatch(&pMgmt->latch);
......@@ -112,7 +112,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) {
static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj));
SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj));
if (pVnode == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
......@@ -189,7 +189,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) {
void *pIter = taosHashIterate(pMgmt->hash, NULL);
while (pIter) {
SVnodeObj **ppVnode = pIter;
SVnodeObj * pVnode = *ppVnode;
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount);
......@@ -211,9 +211,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 30000;
char * content = calloc(1, maxLen + 1);
cJSON * root = NULL;
FILE * fp = NULL;
char *content = calloc(1, maxLen + 1);
cJSON *root = NULL;
FILE *fp = NULL;
char file[PATH_MAX + 20] = {0};
SWrapperCfg *pCfgs = NULL;
......@@ -254,7 +254,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_
}
for (int32_t i = 0; i < vnodesNum; ++i) {
cJSON * vnode = cJSON_GetArrayItem(vnodes, i);
cJSON *vnode = cJSON_GetArrayItem(vnodes, i);
SWrapperCfg *pCfg = &pCfgs[i];
cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId");
......@@ -326,7 +326,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
int32_t len = 0;
int32_t maxLen = 65536;
char * content = calloc(1, maxLen + 1);
char *content = calloc(1, maxLen + 1);
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n");
......@@ -368,8 +368,8 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) {
static void *dnodeOpenVnodeFunc(void *param) {
SVnodeThread *pThread = param;
SDnode * pDnode = pThread->pDnode;
SVnodesMgmt * pMgmt = &pDnode->vmgmt;
SDnode *pDnode = pThread->pDnode;
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum);
setThreadName("open-vnodes");
......@@ -383,7 +383,7 @@ static void *dnodeOpenVnodeFunc(void *param) {
dndReportStartup(pDnode, "open-vnodes", stepDesc);
SVnodeCfg cfg = {.pDnode = pDnode, .pTfs = pDnode->pTfs, .vgId = pCfg->vgId, .dbId = pCfg->dbUid};
SVnode * pImpl = vnodeOpen(pCfg->path, &cfg);
SVnode *pImpl = vnodeOpen(pCfg->path, &cfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++;
......@@ -499,31 +499,6 @@ static void dndCloseVnodes(SDnode *pDnode) {
dInfo("total vnodes:%d are all closed", numOfVnodes);
}
static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) {
SCreateVnodeReq *pCreate = pReq->pCont;
pCreate->vgId = htonl(pCreate->vgId);
pCreate->dnodeId = htonl(pCreate->dnodeId);
pCreate->dbUid = htobe64(pCreate->dbUid);
pCreate->vgVersion = htonl(pCreate->vgVersion);
pCreate->cacheBlockSize = htonl(pCreate->cacheBlockSize);
pCreate->totalBlocks = htonl(pCreate->totalBlocks);
pCreate->daysPerFile = htonl(pCreate->daysPerFile);
pCreate->daysToKeep0 = htonl(pCreate->daysToKeep0);
pCreate->daysToKeep1 = htonl(pCreate->daysToKeep1);
pCreate->daysToKeep2 = htonl(pCreate->daysToKeep2);
pCreate->minRows = htonl(pCreate->minRows);
pCreate->maxRows = htonl(pCreate->maxRows);
pCreate->commitTime = htonl(pCreate->commitTime);
pCreate->fsyncPeriod = htonl(pCreate->fsyncPeriod);
for (int r = 0; r < pCreate->replica; ++r) {
SReplica *pReplica = &pCreate->replicas[r];
pReplica->id = htonl(pReplica->id);
pReplica->port = htons(pReplica->port);
}
return pCreate;
}
static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->vgId = pCreate->vgId;
pCfg->wsize = pCreate->cacheBlockSize;
......@@ -556,37 +531,30 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra
pCfg->vgVersion = pCreate->vgVersion;
}
static SDropVnodeReq *dndParseDropVnodeReq(SRpcMsg *pReq) {
SDropVnodeReq *pDrop = pReq->pCont;
pDrop->vgId = htonl(pDrop->vgId);
return pDrop;
}
static SAuthVnodeReq *dndParseAuthVnodeReq(SRpcMsg *pReq) {
SAuthVnodeReq *pAuth = pReq->pCont;
pAuth->vgId = htonl(pAuth->vgId);
return pAuth;
}
int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq);
dDebug("vgId:%d, create vnode req is received", pCreate->vgId);
SCreateVnodeReq createReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dDebug("vgId:%d, create vnode req is received", createReq.vgId);
SVnodeCfg vnodeCfg = {0};
dndGenerateVnodeCfg(pCreate, &vnodeCfg);
dndGenerateVnodeCfg(&createReq, &vnodeCfg);
SWrapperCfg wrapperCfg = {0};
dndGenerateWrapperCfg(pDnode, pCreate, &wrapperCfg);
dndGenerateWrapperCfg(pDnode, &createReq, &wrapperCfg);
if (pCreate->dnodeId != dndGetDnodeId(pDnode)) {
if (createReq.dnodeId != dndGetDnodeId(pDnode)) {
terrno = TSDB_CODE_DND_VNODE_INVALID_OPTION;
dDebug("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
dDebug("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
return -1;
}
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pCreate->vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, createReq.vgId);
if (pVnode != NULL) {
dDebug("vgId:%d, already exist", pCreate->vgId);
dDebug("vgId:%d, already exist", createReq.vgId);
dndReleaseVnode(pDnode, pVnode);
terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED;
return -1;
......@@ -597,13 +565,13 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
vnodeCfg.dbId = wrapperCfg.dbUid;
SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg);
if (pImpl == NULL) {
dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr());
dError("vgId:%d, failed to create vnode since %s", createReq.vgId, terrstr());
return -1;
}
int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl);
if (code != 0) {
dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr());
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
vnodeClose(pImpl);
vnodeDestroy(wrapperCfg.path);
terrno = code;
......@@ -622,32 +590,37 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq);
dDebug("vgId:%d, alter vnode req is received", pAlter->vgId);
SAlterVnodeReq alterReq = {0};
if (tDeserializeSCreateVnodeReq(pReq->pCont, pReq->contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
dDebug("vgId:%d, alter vnode req is received", alterReq.vgId);
SVnodeCfg vnodeCfg = {0};
dndGenerateVnodeCfg(pAlter, &vnodeCfg);
dndGenerateVnodeCfg(&alterReq, &vnodeCfg);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, alterReq.vgId);
if (pVnode == NULL) {
dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
dDebug("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
return -1;
}
if (pAlter->vgVersion == pVnode->vgVersion) {
if (alterReq.vgVersion == pVnode->vgVersion) {
dndReleaseVnode(pDnode, pVnode);
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId);
dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", alterReq.vgId);
return 0;
}
if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) {
dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr());
dError("vgId:%d, failed to alter vnode since %s", alterReq.vgId, terrstr());
dndReleaseVnode(pDnode, pVnode);
return -1;
}
int32_t oldVersion = pVnode->vgVersion;
pVnode->vgVersion = pAlter->vgVersion;
pVnode->vgVersion = alterReq.vgVersion;
int32_t code = dndWriteVnodesToFile(pDnode);
if (code != 0) {
pVnode->vgVersion = oldVersion;
......@@ -658,9 +631,13 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SDropVnodeReq *pDrop = dndParseDropVnodeReq(pReq);
SDropVnodeReq dropReq = {0};
if (tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t vgId = pDrop->vgId;
int32_t vgId = dropReq.vgId;
dDebug("vgId:%d, drop vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
......@@ -683,27 +660,11 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
return 0;
}
int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SAuthVnodeReq *pAuth = (SAuthVnodeReq *)dndParseAuthVnodeReq(pReq);
int32_t vgId = pAuth->vgId;
dDebug("vgId:%d, auth vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
if (pVnode == NULL) {
dDebug("vgId:%d, failed to auth since %s", vgId, terrstr());
return -1;
}
pVnode->accessState = pAuth->accessState;
dndReleaseVnode(pDnode, pVnode);
return 0;
}
int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SSyncVnodeReq *pSync = (SSyncVnodeReq *)dndParseDropVnodeReq(pReq);
SSyncVnodeReq syncReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &syncReq);
int32_t vgId = pSync->vgId;
int32_t vgId = syncReq.vgId;
dDebug("vgId:%d, sync vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
......@@ -723,9 +684,10 @@ int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
}
int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) {
SCompactVnodeReq *pCompact = (SCompactVnodeReq *)dndParseDropVnodeReq(pReq);
SCompactVnodeReq compatcReq = {0};
tDeserializeSDropVnodeReq(pReq->pCont, pReq->contLen, &compatcReq);
int32_t vgId = pCompact->vgId;
int32_t vgId = compatcReq.vgId;
dDebug("vgId:%d, compact vnode req is received", vgId);
SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId);
......
......@@ -27,10 +27,12 @@ Testbase DndTestBnode::test;
TEST_F(DndTestBnode, 01_Create_Bnode) {
{
int32_t contLen = sizeof(SDCreateBnodeReq);
SDCreateBnodeReq createReq = {0};
createReq.dnodeId = 2;
SDCreateBnodeReq* pReq = (SDCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -38,10 +40,12 @@ TEST_F(DndTestBnode, 01_Create_Bnode) {
}
{
int32_t contLen = sizeof(SDCreateBnodeReq);
SDCreateBnodeReq createReq = {0};
createReq.dnodeId = 1;
SDCreateBnodeReq* pReq = (SDCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -49,10 +53,12 @@ TEST_F(DndTestBnode, 01_Create_Bnode) {
}
{
int32_t contLen = sizeof(SDCreateBnodeReq);
SDCreateBnodeReq createReq = {0};
createReq.dnodeId = 1;
SDCreateBnodeReq* pReq = (SDCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -62,11 +68,12 @@ TEST_F(DndTestBnode, 01_Create_Bnode) {
test.Restart();
{
int32_t contLen = sizeof(SDCreateBnodeReq);
SDCreateBnodeReq* pReq = (SDCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
SDCreateBnodeReq createReq = {0};
createReq.dnodeId = 1;
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_DND_BNODE_ALREADY_DEPLOYED);
......@@ -75,10 +82,12 @@ TEST_F(DndTestBnode, 01_Create_Bnode) {
TEST_F(DndTestBnode, 01_Drop_Bnode) {
{
int32_t contLen = sizeof(SDDropBnodeReq);
SDDropBnodeReq dropReq = {0};
dropReq.dnodeId = 2;
SDDropBnodeReq* pReq = (SDDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(2);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -86,10 +95,12 @@ TEST_F(DndTestBnode, 01_Drop_Bnode) {
}
{
int32_t contLen = sizeof(SDDropBnodeReq);
SDDropBnodeReq dropReq = {0};
dropReq.dnodeId = 1;
SDDropBnodeReq* pReq = (SDDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -97,10 +108,12 @@ TEST_F(DndTestBnode, 01_Drop_Bnode) {
}
{
int32_t contLen = sizeof(SDDropBnodeReq);
SDDropBnodeReq dropReq = {0};
dropReq.dnodeId = 1;
SDDropBnodeReq* pReq = (SDDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -110,10 +123,12 @@ TEST_F(DndTestBnode, 01_Drop_Bnode) {
test.Restart();
{
int32_t contLen = sizeof(SDDropBnodeReq);
SDDropBnodeReq dropReq = {0};
dropReq.dnodeId = 1;
SDDropBnodeReq* pReq = (SDDropBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &dropReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &dropReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......@@ -121,10 +136,12 @@ TEST_F(DndTestBnode, 01_Drop_Bnode) {
}
{
int32_t contLen = sizeof(SDCreateBnodeReq);
SDCreateBnodeReq createReq = {0};
createReq.dnodeId = 1;
SDCreateBnodeReq* pReq = (SDCreateBnodeReq*)rpcMallocCont(contLen);
pReq->dnodeId = htonl(1);
int32_t contLen = tSerializeSMCreateDropQSBNodeReq(NULL, 0, &createReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSMCreateDropQSBNodeReq(pReq, contLen, &createReq);
SRpcMsg* pRsp = test.SendReq(TDMT_DND_CREATE_BNODE, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......
......@@ -74,7 +74,7 @@ class Testbase {
private:
int64_t showId;
STableMetaRsp* pMeta;
STableMetaRsp metaRsp;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
......
......@@ -36,6 +36,9 @@ int32_t mndCheckCreateDbAuth(SUserObj *pOperUser);
int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckWriteAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckReadAuth(SUserObj *pOperUser, SDbObj *pDb);
#ifdef __cplusplus
}
#endif
......
......@@ -24,9 +24,9 @@ extern "C" {
int32_t mndInitDb(SMnode *pMnode);
void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, char *db);
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void **rsp, int32_t *rspLen);
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
#ifdef __cplusplus
}
......
......@@ -24,12 +24,11 @@ extern "C" {
int32_t mndInitStb(SMnode *pMnode);
void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen);
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen);
#ifdef __cplusplus
}
......
......@@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups);
SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup);
int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId);
SCreateVnodeReq *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
SDropVnodeReq *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
#ifdef __cplusplus
}
......
......@@ -165,27 +165,27 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
strcpy(pMeta->tbName, mndShowStr(pShow->type));
pShow->numOfColumns = cols;
......
此差异已折叠。
此差异已折叠。
......@@ -13,3 +13,4 @@ add_subdirectory(mnode)
add_subdirectory(db)
add_subdirectory(stb)
add_subdirectory(func)
add_subdirectory(topic)
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -13,3 +13,4 @@ add_subdirectory(function)
add_subdirectory(qcom)
add_subdirectory(qworker)
add_subdirectory(tfs)
add_subdirectory(nodes)
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册