提交 2ef7bf1c 编写于 作者: L Liu Jicong

merge from 3.0

......@@ -582,9 +582,9 @@ typedef struct SOrderOperatorInfo {
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime);
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
SOperatorInfo* createProjectOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
......@@ -622,7 +622,7 @@ void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
void* destroyOutputBuf(SSDataBlock* pBlock);
void* blockDataDestroy(SSDataBlock* pBlock);
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
......
......@@ -336,7 +336,7 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
return res;
}
void* destroyOutputBuf(SSDataBlock* pBlock) {
void* blockDataDestroy(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return NULL;
}
......@@ -4835,11 +4835,11 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
break;
}
case OP_TableSeqScan: {
pRuntimeEnv->proot = createTableSeqScanOperator(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
pRuntimeEnv->proot = createTableSeqScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv);
break;
}
case OP_DataBlocksOptScan: {
pRuntimeEnv->proot = createDataBlocksOptScanInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
pRuntimeEnv->proot = createTableScanOperatorInfo(pRuntimeEnv->pTsdbReadHandle, pRuntimeEnv, getNumOfScanTimes(pQueryAttr), pQueryAttr->needReverseScan? 1:0);
break;
}
case OP_TableScan: {
......@@ -5162,7 +5162,7 @@ SOperatorInfo* createTableScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv*
return pOperator;
}
SOperatorInfo* createTableSeqScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv) {
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
pInfo->pTsdbReadHandle = pTsdbQueryHandle;
......@@ -5267,7 +5267,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
}
}
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime) {
assert(repeatTime > 0);
STableScanInfo* pInfo = calloc(1, sizeof(STableScanInfo));
......@@ -5278,7 +5278,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
pInfo->order = pRuntimeEnv->pQueryAttr->order.order;
SOperatorInfo* pOptr = calloc(1, sizeof(SOperatorInfo));
pOptr->name = "DataBlocksOptimizedScanOperator";
pOptr->name = "TableScanOperator";
pOptr->operatorType = OP_DataBlocksOptScan;
pOptr->pRuntimeEnv = pRuntimeEnv;
pOptr->blockingOptr = false;
......@@ -5373,7 +5373,7 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
taosArrayDestroy(pInfo->orderColumnList);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->prevRow);
}
......@@ -6566,7 +6566,7 @@ static void doDestroyBasicInfo(SOptrBasicInfo* pInfo, int32_t numOfOutput) {
tfree(pInfo->rowCellInfoOffset);
cleanupResultRowInfo(&pInfo->resultRowInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6590,7 +6590,7 @@ static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
tfree(pInfo->p);
}
......@@ -6607,12 +6607,12 @@ static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput) {
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
STagScanInfo* pInfo = (STagScanInfo*) param;
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock);
}
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
......@@ -6625,7 +6625,7 @@ static void destroyDistinctOperatorInfo(void* param, int32_t numOfOutput) {
taosHashCleanup(pInfo->pSet);
tfree(pInfo->buf);
taosArrayDestroy(pInfo->pDistinctDataInfo);
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
}
SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
......
......@@ -13,7 +13,7 @@
namespace {
// simple test
void simpleTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1);
int32_t pageId = 0;
......@@ -22,40 +22,40 @@ void simpleTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
ASSERT_EQ(getResBufSize(pResultBuf), 1024);
ASSERT_EQ(getTotalBufSize(pResultBuf), 1024);
SIDList list = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(list), 1);
ASSERT_EQ(getNumOfResultBufGroupId(pResultBuf), 1);
releaseResBufPage(pResultBuf, pBufPage);
releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t = getResBufPage(pResultBuf, pageId);
tFilePage* t = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t == pBufPage1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage5);
destroyResultBuf(pResultBuf);
}
void writeDownTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0;
......@@ -68,31 +68,31 @@ void writeDownTest() {
*(int32_t*)(pBufPage->data) = nx;
writePageId = pageId;
releaseResBufPage(pResultBuf, pBufPage);
releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
releaseBufPage(pResultBuf, t4);
// flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
ASSERT_EQ(*(int32_t*)pBufPagex->data, nx);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
......@@ -102,7 +102,7 @@ void writeDownTest() {
}
void recyclePageTest() {
SDiskbasedResultBuf* pResultBuf = NULL;
SDiskbasedBuf* pResultBuf = NULL;
int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1);
int32_t pageId = 0;
......@@ -112,41 +112,41 @@ void recyclePageTest() {
tFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId);
ASSERT_TRUE(pBufPage != NULL);
releaseResBufPage(pResultBuf, pBufPage);
releaseBufPage(pResultBuf, pBufPage);
tFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t1 = getResBufPage(pResultBuf, pageId);
tFilePage* t1 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t1 == pBufPage1);
ASSERT_TRUE(pageId == 1);
tFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t2 = getResBufPage(pResultBuf, pageId);
tFilePage* t2 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t2 == pBufPage2);
ASSERT_TRUE(pageId == 2);
tFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t3 = getResBufPage(pResultBuf, pageId);
tFilePage* t3 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t3 == pBufPage3);
ASSERT_TRUE(pageId == 3);
tFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t4 = getResBufPage(pResultBuf, pageId);
tFilePage* t4 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
releaseBufPage(pResultBuf, t4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t5 = getResBufPage(pResultBuf, pageId);
tFilePage* t5 = getBufPage(pResultBuf, pageId);
ASSERT_TRUE(t5 == pBufPage5);
ASSERT_TRUE(pageId == 5);
// flush the written page to disk, and read it out again
tFilePage* pBufPagex = getResBufPage(pResultBuf, writePageId);
tFilePage* pBufPagex = getBufPage(pResultBuf, writePageId);
*(int32_t*)(pBufPagex->data) = nx;
writePageId = pageId; // update the data
releaseResBufPage(pResultBuf, pBufPagex);
releaseBufPage(pResultBuf, pBufPagex);
tFilePage* pBufPagex1 = getResBufPage(pResultBuf, 1);
tFilePage* pBufPagex1 = getBufPage(pResultBuf, 1);
SArray* pa = getDataBufPagesIdList(pResultBuf, groupId);
ASSERT_EQ(taosArrayGetSize(pa), 6);
......
......@@ -16,6 +16,11 @@
#ifndef TDENGINE_COMMON_H
#define TDENGINE_COMMON_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taosdef.h"
#include "tarray.h"
#include "tmsg.h"
......@@ -44,8 +49,8 @@
typedef struct {
uint32_t numOfTables;
SArray* pGroupList;
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
typedef struct SColumnDataAgg {
......@@ -74,18 +79,28 @@ typedef struct SConstantItem {
// info.numOfCols = taosArrayGetSize(pDataBlock) + taosArrayGetSize(pConstantList);
typedef struct SSDataBlock {
SColumnDataAgg* pBlockAgg;
SArray* pDataBlock; // SArray<SColumnInfoData>
SArray* pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info;
SColumnDataAgg *pBlockAgg;
SArray *pDataBlock; // SArray<SColumnInfoData>
SArray *pConstantList; // SArray<SConstantItem>, it is a constant/tags value of the corresponding result value.
SDataBlockInfo info;
} SSDataBlock;
typedef struct SVarColAttr {
int32_t *offset; // start position for each entry in the list
uint32_t length; // used buffer size that contain the valid data
uint32_t allocLen; // allocated buffer size
} SVarColAttr;
// pBlockAgg->numOfNull == info.rows, all data are null
// pBlockAgg->numOfNull == 0, no data are null.
typedef struct SColumnInfoData {
SColumnInfo info; // TODO filter info needs to be removed
char* nullbitmap; //
char* pData; // the corresponding block data in memory
SColumnInfo info; // TODO filter info needs to be removed
bool hasNull;// if current column data has null value.
char *pData; // the corresponding block data in memory
union {
char *nullbitmap; // bitmap, one bit for each item in the list
SVarColAttr varmeta;
};
} SColumnInfoData;
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
......@@ -235,11 +250,11 @@ typedef struct SSqlExpr {
char token[TSDB_COL_NAME_LEN]; // original token
SSchema resSchema;
int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
int32_t numOfCols;
SColumn* pColumns; // data columns that are required by query
int32_t interBytes; // inter result buffer size
int16_t numOfParams; // argument value of each function
SVariant param[3]; // parameters are not more than 3
} SSqlExpr;
typedef struct SExprInfo {
......@@ -261,4 +276,8 @@ typedef struct SSessionWindow {
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_COMMON_H
......@@ -7,12 +7,22 @@ extern "C" {
#include "os.h"
#include "tmsg.h"
#include "common.h"
typedef struct SCorEpSet {
int32_t version;
SEpSet epSet;
} SCorEpSet;
typedef struct SBlockOrderInfo {
int32_t order;
int32_t colIndex;
SColumnInfoData *pColData;
// int32_t type;
// int32_t bytes;
// bool hasNull;
} SBlockOrderInfo;
int taosGetFqdnPortFromEp(const char *ep, SEp *pEp);
void addEpIntoEpSet(SEpSet *pEpSet, const char *fqdn, uint16_t port);
......@@ -21,6 +31,77 @@ bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2);
void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet);
SEpSet getEpSet_s(SCorEpSet *pEpSet);
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
#define colDataIsNull_f(bm_, r_) ((BMCharPos(bm_, r_) & (1u << (7u - BitPos(r_)))) == (1u << (7u - BitPos(r_))))
#define colDataSetNull_f(bm_, r_) \
do { \
BMCharPos(bm_, r_) |= (1u << (7u - BitPos(r_))); \
} while (0)
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row, SColumnDataAgg* pColAgg) {
if (!pColumnInfoData->hasNull) {
return false;
}
if (pColAgg != NULL) {
if (pColAgg->numOfNull == totalRows) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return true;
} else if (pColAgg->numOfNull == 0) {
ASSERT(pColumnInfoData->nullbitmap == NULL);
return false;
}
}
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.offset[row] == -1;
} else {
if (pColumnInfoData->nullbitmap == NULL) {
return false;
}
return colDataIsNull_f(pColumnInfoData->nullbitmap, row);
}
}
#define colDataGet(p1_, r_) \
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? (p1_)->pData + (p1_)->varmeta.offset[(r_)] \
: (p1_)->pData + ((r_) * (p1_)->info.bytes));
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2);
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock);
int32_t colDataGetSize(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t colDataGetNumOfCols(const SSDataBlock* pBlock);
size_t colDataGetNumOfRows(const SSDataBlock* pBlock);
int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc);
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize);
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount);
int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock);
int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf);
size_t blockDataGetSize(const SSDataBlock* pBlock);
size_t blockDataGetRowSize(const SSDataBlock* pBlock);
double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock);
size_t blockDataNumOfRowsForSerialize(const SSDataBlock* pBlock, int32_t blockSize);
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst);
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
void *blockDataDestroy(SSDataBlock *pBlock);
#ifdef __cplusplus
}
#endif
......
......@@ -234,9 +234,9 @@ typedef struct {
void* pMsg;
} SSubmitMsgIter;
int tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
int32_t tInitSubmitMsgIter(SSubmitMsg* pMsg, SSubmitMsgIter* pIter);
int32_t tGetSubmitMsgNext(SSubmitMsgIter* pIter, SSubmitBlk** pPBlock);
int32_t tInitSubmitBlkIter(SSubmitBlk* pBlock, SSubmitBlkIter* pIter);
STSRow* tGetSubmitBlkNext(SSubmitBlkIter* pIter);
typedef struct {
......@@ -272,8 +272,8 @@ typedef struct {
char comment[TSDB_STB_COMMENT_LEN];
} SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void** buf, SMCreateStbReq* pReq);
void* tDeserializeSMCreateStbReq(void* buf, SMCreateStbReq* pReq);
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
int32_t tDeserializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
void tFreeSMCreateStbReq(SMCreateStbReq* pReq);
typedef struct {
......@@ -281,8 +281,8 @@ typedef struct {
int8_t igNotExists;
} SMDropStbReq;
int32_t tSerializeSMDropStbReq(void** buf, SMDropStbReq* pReq);
void* tDeserializeSMDropStbReq(void* buf, SMDropStbReq* pReq);
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......@@ -291,15 +291,9 @@ typedef struct {
SArray* pFields;
} SMAltertbReq;
int32_t tSerializeSMAlterStbReq(void** buf, SMAltertbReq* pReq);
void* tDeserializeSMAlterStbReq(void* buf, SMAltertbReq* pReq);
typedef struct {
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
int64_t startTime;
} SConnectReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
int32_t tDeserializeSMAlterStbReq(void* buf, int32_t bufLen, SMAltertbReq* pReq);
void tFreeSMAltertbReq(SMAltertbReq* pReq);
typedef struct SEpSet {
int8_t inUse;
......@@ -307,56 +301,33 @@ typedef struct SEpSet {
SEp eps[TSDB_MAX_REPLICA];
} SEpSet;
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
int tlen = 0;
tlen += taosEncodeFixedI8(buf, pEp->inUse);
tlen += taosEncodeFixedI8(buf, pEp->numOfEps);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
tlen += taosEncodeFixedU16(buf, pEp->eps[i].port);
tlen += taosEncodeString(buf, pEp->eps[i].fqdn);
}
return tlen;
}
int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp);
int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(void* buf, SEpSet* pEp);
static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
buf = taosDecodeFixedI8(buf, &pEp->inUse);
buf = taosDecodeFixedI8(buf, &pEp->numOfEps);
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
buf = taosDecodeFixedU16(buf, &pEp->eps[i].port);
buf = taosDecodeStringTo(buf, pEp->eps[i].fqdn);
}
return buf;
}
static FORCE_INLINE int32_t tEncodeSEpSet(SCoder* pEncoder, const SEpSet* pEp) {
if (tEncodeI8(pEncoder, pEp->inUse) < 0) return -1;
if (tEncodeI8(pEncoder, pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tEncodeU16(pEncoder, pEp->eps[i].port) < 0) return -1;
if (tEncodeCStr(pEncoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
typedef struct {
int32_t pid;
char app[TSDB_APP_NAME_LEN];
char db[TSDB_DB_NAME_LEN];
int64_t startTime;
} SConnectReq;
static FORCE_INLINE int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp) {
if (tDecodeI8(pDecoder, &pEp->inUse) < 0) return -1;
if (tDecodeI8(pDecoder, &pEp->numOfEps) < 0) return -1;
for (int i = 0; i < TSDB_MAX_REPLICA; i++) {
if (tDecodeU16(pDecoder, &pEp->eps[i].port) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pEp->eps[i].fqdn) < 0) return -1;
}
return 0;
}
int32_t tSerializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
int32_t tDeserializeSConnectReq(void* buf, int32_t bufLen, SConnectReq* pReq);
typedef struct {
int32_t acctId;
int64_t clusterId;
int32_t connId;
int8_t superUser;
int8_t align[3];
SEpSet epSet;
char sVersion[128];
} SConnectRsp;
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
int32_t tDeserializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
typedef struct {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
......@@ -803,6 +774,9 @@ typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
} STableInfoReq;
int32_t tSerializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);
int32_t tDeserializeSTableInfoReq(void* buf, int32_t bufLen, STableInfoReq* pReq);
typedef struct {
int8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups;
......@@ -839,9 +813,21 @@ typedef struct {
uint64_t suid;
uint64_t tuid;
int32_t vgId;
SSchema pSchema[];
SSchema* pSchemas;
} STableMetaRsp;
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
typedef struct {
SArray* pArray; // Array of STableMetaRsp
} STableMetaBatchRsp;
int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp);
typedef struct {
int32_t numOfTables;
int32_t numOfVgroup;
......@@ -875,18 +861,15 @@ int32_t tSerializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
int32_t tDeserializeSShowReq(void* buf, int32_t bufLen, SShowReq* pReq);
void tFreeSShowReq(SShowReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int32_t numOfVgroup;
int32_t vgid[];
} SCompactReq;
typedef struct {
int64_t showId;
STableMetaRsp tableMeta;
} SShowRsp;
} SShowRsp, SVShowTablesRsp;
int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
void tFreeSShowRsp(SShowRsp* pRsp);
// todo: the show handle should be replaced with id
typedef struct {
int64_t showId;
int8_t free;
......@@ -1103,41 +1086,17 @@ typedef struct {
char* sql;
char* physicalPlan;
char* logicalPlan;
} SCMCreateTopicReq;
} SMCreateTopicReq;
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
int tlen = 0;
tlen += taosEncodeFixedI8(buf, pReq->igExists);
tlen += taosEncodeString(buf, pReq->name);
tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeString(buf, pReq->logicalPlan);
return tlen;
}
static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) {
buf = taosDecodeFixedI8(buf, &(pReq->igExists));
buf = taosDecodeString(buf, &(pReq->name));
buf = taosDecodeString(buf, &(pReq->sql));
buf = taosDecodeString(buf, &(pReq->physicalPlan));
buf = taosDecodeString(buf, &(pReq->logicalPlan));
return buf;
}
int32_t tSerializeMCreateTopicReq(void** buf, const SMCreateTopicReq* pReq);
void* tDeserializeSMCreateTopicReq(void* buf, SMCreateTopicReq* pReq);
typedef struct {
int64_t topicId;
} SCMCreateTopicRsp;
} SMCreateTopicRsp;
static FORCE_INLINE int tSerializeSCMCreateTopicRsp(void** buf, const SCMCreateTopicRsp* pRsp) {
int tlen = 0;
tlen += taosEncodeFixedI64(buf, pRsp->topicId);
return tlen;
}
static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopicRsp* pRsp) {
buf = taosDecodeFixedI64(buf, &pRsp->topicId);
return buf;
}
int32_t tSerializeSMCreateTopicRsp(void* buf, int32_t bufLen, const SMCreateTopicRsp* pRsp);
int32_t tDeserializeSMCreateTopicRsp(void* buf, int32_t bufLen, SMCreateTopicRsp* pRsp);
typedef struct {
int32_t topicNum;
......@@ -1284,19 +1243,13 @@ typedef struct {
int64_t status;
} SMVSubscribeRsp;
typedef struct {
char name[TSDB_TOPIC_NAME_LEN];
int8_t igExists;
int32_t execLen;
void* executor;
int32_t sqlLen;
char* sql;
} SCreateTopicReq;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SDropTopicReq;
} SMDropTopicReq;
int32_t tSerializeSMDropTopicReqq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
......@@ -1398,11 +1351,6 @@ typedef struct {
SMsgHead head;
} SVShowTablesReq;
typedef struct {
int64_t id;
STableMetaRsp metaInfo;
} SVShowTablesRsp;
typedef struct {
SMsgHead head;
int32_t id;
......@@ -1607,7 +1555,7 @@ int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchR
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
if (tEncodeCStrWithLen(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
return 0;
}
......@@ -1803,7 +1751,7 @@ typedef struct {
SSchema* pSchema;
} SSchemaWrapper;
static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) {
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
int32_t tlen = 0;
tlen += taosEncodeFixedI8(buf, pSchema->type);
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
......@@ -1812,7 +1760,7 @@ static FORCE_INLINE int32_t tEncodeSSchema(void** buf, const SSchema* pSchema) {
return tlen;
}
static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) {
static FORCE_INLINE void* taosDecodeSSchema(void* buf, SSchema* pSchema) {
buf = taosDecodeFixedI8(buf, &pSchema->type);
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
buf = taosDecodeFixedI32(buf, &pSchema->colId);
......@@ -1820,11 +1768,27 @@ static FORCE_INLINE void* tDecodeSSchema(void* buf, SSchema* pSchema) {
return buf;
}
static FORCE_INLINE int32_t tEncodeSSchema(SCoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI32(pEncoder, pSchema->bytes) < 0) return -1;
if (tEncodeI32(pEncoder, pSchema->colId) < 0) return -1;
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchema(SCoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pSchema->bytes) < 0) return -1;
if (tDecodeI32(pDecoder, &pSchema->colId) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
return 0;
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
int32_t tlen = 0;
tlen += taosEncodeFixedU32(buf, pSW->nCols);
for (int32_t i = 0; i < pSW->nCols; i++) {
tlen += tEncodeSSchema(buf, &pSW->pSchema[i]);
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
}
return tlen;
}
......@@ -1835,8 +1799,9 @@ static FORCE_INLINE void* tDecodeSSchemaWrapper(void* buf, SSchemaWrapper* pSW)
if (pSW->pSchema == NULL) {
return NULL;
}
for (int32_t i = 0; i < pSW->nCols; i++) {
buf = tDecodeSSchema(buf, &pSW->pSchema[i]);
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
}
return buf;
}
......
......@@ -139,7 +139,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_TRANS, "mnode-trans", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GRANT, "mnode-grant", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_AUTH, "mnode-auth", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SCMCreateTopicReq, SCMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "mnode-create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
......
......@@ -473,7 +473,7 @@ static int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType);
uint32_t len = 0;
......
......@@ -113,13 +113,13 @@ typedef enum ELogicConditionType {
} ELogicConditionType;
typedef struct SLogicConditionNode {
ENodeType type; // QUERY_NODE_LOGIC_CONDITION
SExprNode node; // QUERY_NODE_LOGIC_CONDITION
ELogicConditionType condType;
SNodeList* pParameterList;
} SLogicConditionNode;
typedef struct SIsNullCondNode {
ENodeType type; // QUERY_NODE_IS_NULL_CONDITION
SExprNode node; // QUERY_NODE_IS_NULL_CONDITION
SNode* pExpr;
bool isNull;
} SIsNullCondNode;
......
......@@ -24,7 +24,7 @@
#endif
OP_ENUM_MACRO(StreamScan)
OP_ENUM_MACRO(DataBlocksOptScan)
OP_ENUM_MACRO(TableScan)
OP_ENUM_MACRO(TableSeqScan)
OP_ENUM_MACRO(TagScan)
OP_ENUM_MACRO(SystemTableScan)
......
......@@ -22,28 +22,31 @@ extern "C" {
typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param);
typedef struct SLoserTreeNode {
typedef struct STreeNode {
int32_t index;
void *pData;
} SLoserTreeNode;
void *pData; // TODO remove it?
} STreeNode;
typedef struct SLoserTreeInfo {
int32_t numOfEntries;
int32_t totalEntries;
typedef struct SMultiwayMergeTreeInfo {
int32_t numOfSources;
int32_t totalSources;
__merge_compare_fn_t comparFn;
void * param;
SLoserTreeNode *pNode;
} SLoserTreeInfo;
struct STreeNode *pNode;
} SMultiwayMergeTreeInfo;
uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
#define tMergeTreeGetChosenIndex(t_) ((t_)->pNode[0].index)
#define tMergeTreeGetAdjustIndex(t_) (tMergeTreeGetChosenIndex(t_) + (t_)->numOfSources)
void tLoserTreeInit(SLoserTreeInfo *pTree);
int32_t tMergeTreeCreate(SMultiwayMergeTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn);
void tLoserTreeAdjust(SLoserTreeInfo *pTree, int32_t idx);
void tMergeTreeDestroy(SMultiwayMergeTreeInfo* pTree);
void tLoserTreeRebuild(SLoserTreeInfo *pTree);
void tMergeTreeAdjust(SMultiwayMergeTreeInfo *pTree, int32_t idx);
void tLoserTreeDisplay(SLoserTreeInfo *pTree);
void tMergeTreeRebuild(SMultiwayMergeTreeInfo *pTree);
void tMergeTreePrint(const SMultiwayMergeTreeInfo *pTree);
#ifdef __cplusplus
}
......
......@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TPAGEDFILE_H
#define TDENGINE_TPAGEDFILE_H
#ifndef TDENGINE_TPAGEDBUF_H
#define TDENGINE_TPAGEDBUF_H
#ifdef __cplusplus
extern "C" {
......@@ -26,57 +26,10 @@ extern "C" {
#include "tlockfree.h"
typedef struct SArray* SIDList;
typedef struct SPageDiskInfo {
int32_t offset;
int32_t length;
} SPageDiskInfo;
typedef struct SPageInfo {
SListNode* pn; // point to list node
int32_t pageId;
SPageDiskInfo info;
void* pData;
bool used; // set current page is in used
} SPageInfo;
typedef struct SFreeListItem {
int32_t offset;
int32_t len;
} SFreeListItem;
typedef struct SResultBufStatis {
int32_t flushBytes;
int32_t loadBytes;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SResultBufStatis;
typedef struct SDiskbasedResultBuf {
int32_t numOfPages;
int64_t totalBufSize;
int64_t fileSize; // disk file size
FILE* file;
int32_t allocateId; // allocated page id
char* path; // file path
int32_t pageSize; // current used page size
int32_t inMemPages; // numOfPages that are allocated in memory
SHashObj* groupSet; // id hash table
SHashObj* all;
SList* lruList;
void* emptyDummyIdList; // dummy id list
void* assistBuf; // assistant buffer for compress/decompress data
SArray* pFree; // free area in file
bool comp; // compressed before flushed to disk
int32_t nextPos; // next page flush position
uint64_t qId; // for debug purpose
SResultBufStatis statis;
} SDiskbasedResultBuf;
typedef struct SPageInfo SPageInfo;
typedef struct SDiskbasedBuf SDiskbasedBuf;
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
#define DEFAULT_PAGE_SIZE (16384L)
typedef struct SFilePage {
......@@ -84,76 +37,84 @@ typedef struct SFilePage {
char data[];
} SFilePage;
typedef struct SDiskbasedBufStatis {
int64_t flushBytes;
int64_t loadBytes;
int32_t loadPages;
int32_t getPages;
int32_t releasePages;
int32_t flushPages;
} SDiskbasedBufStatis;
/**
* create disk-based result buffer
* @param pResultBuf
* @param pBuf
* @param rowSize
* @param pagesize
* @param inMemPages
* @param handle
* @return
*/
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
int32_t createDiskbasedBuffer(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir);
/**
*
* @param pResultBuf
* @param pBuf
* @param groupId
* @param pageId
* @return
*/
SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId);
SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId);
/**
*
* @param pResultBuf
* @param pBuf
* @param groupId
* @return
*/
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId);
/**
* get the specified buffer page by id
* @param pResultBuf
* @param pBuf
* @param id
* @return
*/
SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id);
SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id);
/**
* release the referenced buf pages
* @param pResultBuf
* @param pBuf
* @param page
*/
void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page);
void releaseBufPage(SDiskbasedBuf* pBuf, void* page);
/**
*
* @param pResultBuf
* @param pBuf
* @param pi
*/
void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi);
void releaseBufPageInfo(SDiskbasedBuf* pBuf, struct SPageInfo* pi);
/**
* get the total buffer size in the format of disk file
* @param pResultBuf
* @param pBuf
* @return
*/
size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf);
size_t getTotalBufSize(const SDiskbasedBuf* pBuf);
/**
* get the number of groups in the result buffer
* @param pResultBuf
* @param pBuf
* @return
*/
size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf);
size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pBuf);
/**
* destroy result buffer
* @param pResultBuf
* @param pBuf
*/
void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
void destroyResultBuf(SDiskbasedBuf* pBuf);
/**
*
......@@ -162,8 +123,49 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf);
*/
SPageInfo* getLastPageInfo(SIDList pList);
/**
*
* @param pPgInfo
* @return
*/
int32_t getPageId(const SPageInfo* pPgInfo);
/**
* Return the buffer page size.
* @param pBuf
* @return
*/
int32_t getBufPageSize(const SDiskbasedBuf* pBuf);
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf);
/**
*
* @param pBuf
* @return
*/
bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf);
/**
* Set the buffer page is dirty, and needs to be flushed to disk when swap out.
* @param pPageInfo
* @param dirty
*/
void setBufPageDirty(SFilePage* pPageInfo, bool dirty);
/**
* Print the statistics when closing this buffer
* @param pBuf
*/
void printStatisBeforeClose(SDiskbasedBuf* pBuf);
/**
* return buf statistics.
*/
SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TPAGEDFILE_H
#endif // TDENGINE_TPAGEDBUF_H
......@@ -70,62 +70,42 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
}
}
tFreeSUseDbBatchRsp(&batchUseRsp);
return TSDB_CODE_SUCCESS;
}
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t msgLen = 0;
int32_t code = 0;
int32_t schemaNum = 0;
while (msgLen < valueLen) {
STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen);
rsp->numOfColumns = ntohl(rsp->numOfColumns);
rsp->suid = be64toh(rsp->suid);
rsp->dbId = be64toh(rsp->dbId);
STableMetaBatchRsp batchMetaRsp = {0};
if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i);
if (rsp->numOfColumns < 0) {
schemaNum = 0;
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
} else {
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
rsp->numOfTags = ntohl(rsp->numOfTags);
rsp->sversion = ntohl(rsp->sversion);
rsp->tversion = ntohl(rsp->tversion);
rsp->tuid = be64toh(rsp->tuid);
rsp->vgId = ntohl(rsp->vgId);
SSchema* pSchema = rsp->pSchema;
schemaNum = rsp->numOfColumns + rsp->numOfTags;
for (int i = 0; i < schemaNum; ++i) {
pSchema->bytes = ntohl(pSchema->bytes);
pSchema->colId = ntohl(pSchema->colId);
pSchema++;
}
if (rsp->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchema[0].colId);
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
tFreeSTableMetaBatchRsp(&batchMetaRsp);
return TSDB_CODE_TSC_INVALID_VALUE;
}
}
catalogUpdateSTableMeta(pCatalog, rsp);
}
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
}
tFreeSTableMetaBatchRsp(&batchMetaRsp);
return TSDB_CODE_SUCCESS;
}
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == info) {
......
......@@ -9,7 +9,7 @@
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedfile.h"
#include "tpagedbuf.h"
#include "tref.h"
static int32_t initEpSetFromCfg(const char *firstEp, const char *secondEp, SCorEpSet *pEpSet);
......@@ -357,40 +357,38 @@ STscObj* taosConnectImpl(const char *user, const char *auth, const char *db, __t
return pTscObj;
}
static SMsgSendInfo* buildConnectMsg(SRequestObj *pRequest) {
SMsgSendInfo *pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest) {
SMsgSendInfo* pMsgSendInfo = calloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->msgInfo.len = sizeof(SConnectReq);
pMsgSendInfo->msgType = TDMT_MND_CONNECT;
pMsgSendInfo->requestObjRefId = pRequest->self;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest;
pMsgSendInfo->requestId = pRequest->requestId;
pMsgSendInfo->fp = handleRequestRspFp[TMSG_INDEX(pMsgSendInfo->msgType)];
pMsgSendInfo->param = pRequest;
SConnectReq *pConnect = calloc(1, sizeof(SConnectReq));
if (pConnect == NULL) {
tfree(pMsgSendInfo);
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
STscObj *pObj = pRequest->pTscObj;
SConnectReq connectReq = {0};
STscObj* pObj = pRequest->pTscObj;
char* db = getDbOfConnection(pObj);
if (db != NULL) {
tstrncpy(pConnect->db, db, sizeof(pConnect->db));
tstrncpy(connectReq.db, db, sizeof(connectReq.db));
}
tfree(db);
pConnect->pid = htonl(appInfo.pid);
pConnect->startTime = htobe64(appInfo.startTime);
tstrncpy(pConnect->app, appInfo.appName, tListLen(pConnect->app));
connectReq.pid = htonl(appInfo.pid);
connectReq.startTime = htobe64(appInfo.startTime);
tstrncpy(connectReq.app, appInfo.appName, sizeof(connectReq.app));
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = malloc(contLen);
tSerializeSConnectReq(pReq, contLen, &connectReq);
pMsgSendInfo->msgInfo.pData = pConnect;
pMsgSendInfo->msgInfo.len = contLen;
pMsgSendInfo->msgInfo.pData = pReq;
return pMsgSendInfo;
}
......
......@@ -20,14 +20,14 @@
#include "clientLog.h"
#include "catalog.h"
int (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
static void setErrno(SRequestObj* pRequest, int32_t code) {
pRequest->code = code;
terrno = code;
}
int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
setErrno(pRequest, code);
......@@ -36,7 +36,7 @@ int genericRspCallback(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
SRequestObj* pRequest = param;
if (code != TSDB_CODE_SUCCESS) {
free(pMsg->pData);
......@@ -45,41 +45,35 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
STscObj *pTscObj = pRequest->pTscObj;
STscObj* pTscObj = pRequest->pTscObj;
SConnectRsp *pConnect = (SConnectRsp *)pMsg->pData;
pConnect->acctId = htonl(pConnect->acctId);
pConnect->connId = htonl(pConnect->connId);
pConnect->clusterId = htobe64(pConnect->clusterId);
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pData, pMsg->len, &connectRsp);
assert(connectRsp.epSet.numOfEps > 0);
assert(pConnect->epSet.numOfEps > 0);
for(int32_t i = 0; i < pConnect->epSet.numOfEps; ++i) {
pConnect->epSet.eps[i].port = htons(pConnect->epSet.eps[i].port);
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &connectRsp.epSet);
}
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, &pConnect->epSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, &pConnect->epSet);
for (int32_t i = 0; i < connectRsp.epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%" PRIx64, pRequest->requestId, i,
connectRsp.epSet.eps[i].fqdn, connectRsp.epSet.eps[i].port, pTscObj->id);
}
for (int i = 0; i < pConnect->epSet.numOfEps; ++i) {
tscDebug("0x%" PRIx64 " epSet.fqdn[%d]:%s port:%d, connObj:0x%"PRIx64, pRequest->requestId, i, pConnect->epSet.eps[i].fqdn,
pConnect->epSet.eps[i].port, pTscObj->id);
}
pTscObj->connId = pConnect->connId;
pTscObj->acctId = pConnect->acctId;
tstrncpy(pTscObj->ver, pConnect->sVersion, tListLen(pTscObj->ver));
pTscObj->connId = connectRsp.connId;
pTscObj->acctId = connectRsp.acctId;
tstrncpy(pTscObj->ver, connectRsp.sVersion, tListLen(pTscObj->ver));
// update the appInstInfo
pTscObj->pAppInfo->clusterId = pConnect->clusterId;
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
atomic_add_fetch_64(&pTscObj->pAppInfo->numOfConns, 1);
pTscObj->connType = HEARTBEAT_TYPE_QUERY;
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, connectRsp.connId, connectRsp.clusterId, HEARTBEAT_TYPE_QUERY);
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, connectRsp.clusterId,
pTscObj->pAppInfo->numOfConns);
free(pMsg->pData);
......@@ -135,39 +129,29 @@ int32_t processShowRsp(void* param, const SDataBuf* pMsg, int32_t code) {
return code;
}
SShowRsp* pShow = (SShowRsp *)pMsg->pData;
pShow->showId = htobe64(pShow->showId);
STableMetaRsp *pMetaMsg = &(pShow->tableMeta);
pMetaMsg->numOfColumns = htonl(pMetaMsg->numOfColumns);
SShowRsp showRsp = {0};
tDeserializeSShowRsp(pMsg->pData, pMsg->len, &showRsp);
STableMetaRsp *pMetaMsg = &showRsp.tableMeta;
SSchema* pSchema = pMetaMsg->pSchema;
pMetaMsg->tuid = htobe64(pMetaMsg->tuid);
for (int i = 0; i < pMetaMsg->numOfColumns; ++i) {
pSchema->bytes = htonl(pSchema->bytes);
pSchema->colId = htonl(pSchema->colId);
pSchema++;
}
pSchema = pMetaMsg->pSchema;
tfree(pRequest->body.resInfo.pRspMsg);
pRequest->body.resInfo.pRspMsg = pMsg->pData;
SReqResultInfo* pResInfo = &pRequest->body.resInfo;
if (pResInfo->fields == NULL) {
TAOS_FIELD* pFields = calloc(pMetaMsg->numOfColumns, sizeof(TAOS_FIELD));
for (int32_t i = 0; i < pMetaMsg->numOfColumns; ++i) {
tstrncpy(pFields[i].name, pSchema[i].name, tListLen(pFields[i].name));
pFields[i].type = pSchema[i].type;
pFields[i].bytes = pSchema[i].bytes;
SSchema* pSchema = &pMetaMsg->pSchemas[i];
tstrncpy(pFields[i].name, pSchema->name, tListLen(pFields[i].name));
pFields[i].type = pSchema->type;
pFields[i].bytes = pSchema->bytes;
}
pResInfo->fields = pFields;
}
pResInfo->numOfCols = pMetaMsg->numOfColumns;
pRequest->body.showInfo.execId = pShow->showId;
pRequest->body.showInfo.execId = showRsp.showId;
tFreeSShowRsp(&showRsp);
// todo
if (pRequest->type == TDMT_VND_SHOW_TABLES) {
......
......@@ -25,7 +25,7 @@
#include "tglobal.h"
#include "tmsgtype.h"
#include "tnote.h"
#include "tpagedfile.h"
#include "tpagedbuf.h"
#include "tref.h"
struct tmq_list_t {
......@@ -381,7 +381,7 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
char topicFname[TSDB_TOPIC_FNAME_LEN] = {0};
tNameExtractFullName(&name, topicFname);
SCMCreateTopicReq req = {
SMCreateTopicReq req = {
.name = (char*)topicFname,
.igExists = 1,
.physicalPlan = (char*)pStr,
......@@ -389,14 +389,14 @@ TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, i
.logicalPlan = (char*)"no logic plan",
};
int tlen = tSerializeSCMCreateTopicReq(NULL, &req);
int tlen = tSerializeMCreateTopicReq(NULL, &req);
void* buf = malloc(tlen);
if (buf == NULL) {
goto _return;
}
void* abuf = buf;
tSerializeSCMCreateTopicReq(&abuf, &req);
tSerializeMCreateTopicReq(&abuf, &req);
/*printf("formatted: %s\n", dagStr);*/
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
......
此差异已折叠。
此差异已折叠。
#include <common.h>
#include "os.h"
#include "tutil.h"
......@@ -268,4 +269,5 @@ SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* nam
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
\ No newline at end of file
}
......@@ -344,8 +344,8 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
UNUSED(ret);
}
fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
int32_t ret = fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
ret = fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
// NOTE: mix types tags are not supported
size_t sz = 0;
......
#include <common.h>
#include <gtest/gtest.h>
#include <tep.h>
#include <iostream>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wwrite-strings"
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wunused-but-set-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
......@@ -96,4 +97,199 @@ TEST(testCase, toInteger_test) {
ASSERT_EQ(ret, -1);
}
TEST(testCase, Datablock_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
infoData.pData = (char*) calloc(40, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (40/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 40;
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
infoData1.info.colId = 2;
infoData1.varmeta.offset = (int32_t*) calloc(40, sizeof(uint32_t));
taosArrayPush(b->pDataBlock, &infoData1);
char* str = "the value of: %d";
char buf[128] = {0};
char varbuf[128] = {0};
for(int32_t i = 0; i < 40; ++i) {
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
if (i&0x01) {
int32_t len = sprintf(buf, str, i);
STR_TO_VARSTR(varbuf, buf)
colDataAppend(p0, i, (const char*) &i, false);
colDataAppend(p1, i, (const char*) varbuf, false);
memset(varbuf, 0, sizeof(varbuf));
memset(buf, 0, sizeof(buf));
} else {
colDataAppend(p0, i, (const char*) &i, true);
colDataAppend(p1, i, (const char*) varbuf, true);
}
b->info.rows++;
}
SColumnInfoData* p0 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData *) taosArrayGet(b->pDataBlock, 1);
for(int32_t i = 0; i < 40; ++i) {
if (i & 0x01) {
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), false);
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), false);
} else {
ASSERT_EQ(colDataIsNull_f(p0->nullbitmap, i), true);
ASSERT_EQ(colDataIsNull(p0, b->info.rows, i, nullptr), true);
ASSERT_EQ(colDataIsNull(p1, b->info.rows, i, nullptr), true);
}
}
printf("binary column length:%d\n", *(int32_t*) p1->pData);
ASSERT_EQ(colDataGetNumOfCols(b), 2);
ASSERT_EQ(colDataGetNumOfRows(b), 40);
char* pData = colDataGet(p1, 3);
printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData));
SArray* pOrderInfo = taosArrayInit(3, sizeof(SBlockOrderInfo));
SBlockOrderInfo order = {.order = TSDB_ORDER_ASC, .colIndex = 0};
taosArrayPush(pOrderInfo, &order);
blockDataSort(b, pOrderInfo, true);
blockDataDestroy(b);
taosArrayDestroy(pOrderInfo);
}
#if 0
TEST(testCase, non_var_dataBlock_split_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
int32_t numOfRows = 1000000;
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 1;
infoData1.info.type = TSDB_DATA_TYPE_TINYINT;
infoData1.info.colId = 2;
infoData1.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData1.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData1);
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
int8_t v = i;
colDataAppend(p0, i, (const char*)&i, false);
colDataAppend(p1, i, (const char*)&v, false);
b->info.rows++;
}
int32_t pageSize = 64 * 1024;
int32_t startIndex= 0;
int32_t stopIndex = 0;
int32_t count = 1;
while(1) {
blockDataSplitRows(b, false, startIndex, &stopIndex, pageSize);
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
if (stopIndex == numOfRows - 1) {
break;
}
startIndex = stopIndex + 1;
}
}
#endif
TEST(testCase, var_dataBlock_split_test) {
SSDataBlock* b = static_cast<SSDataBlock*>(calloc(1, sizeof(SSDataBlock)));
b->info.numOfCols = 2;
b->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData));
int32_t numOfRows = 1000000;
SColumnInfoData infoData = {0};
infoData.info.bytes = 4;
infoData.info.type = TSDB_DATA_TYPE_INT;
infoData.info.colId = 1;
infoData.pData = (char*) calloc(numOfRows, infoData.info.bytes);
infoData.nullbitmap = (char*) calloc(1, sizeof(char) * (numOfRows/8));
taosArrayPush(b->pDataBlock, &infoData);
SColumnInfoData infoData1 = {0};
infoData1.info.bytes = 40;
infoData1.info.type = TSDB_DATA_TYPE_BINARY;
infoData1.info.colId = 2;
infoData1.varmeta.offset = (int32_t*) calloc(numOfRows, sizeof(uint32_t));
taosArrayPush(b->pDataBlock, &infoData1);
char buf[41] = {0};
char buf1[100] = {0};
for(int32_t i = 0; i < numOfRows; ++i) {
SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0);
SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1);
int8_t v = i;
colDataAppend(p0, i, (const char*)&i, false);
sprintf(buf, "the number of row:%d", i);
int32_t len = sprintf(buf1, buf, i);
STR_TO_VARSTR(buf1, buf)
colDataAppend(p1, i, buf1, false);
b->info.rows++;
memset(buf, 0, sizeof(buf));
memset(buf1, 0, sizeof(buf1));
}
int32_t pageSize = 64 * 1024;
int32_t startIndex= 0;
int32_t stopIndex = 0;
int32_t count = 1;
while(1) {
blockDataSplitRows(b, true, startIndex, &stopIndex, pageSize);
printf("the %d split, from: %d to %d\n", count++, startIndex, stopIndex);
if (stopIndex == numOfRows - 1) {
break;
}
startIndex = stopIndex + 1;
}
}
#pragma GCC diagnostic pop
\ No newline at end of file
......@@ -74,7 +74,7 @@ class Testbase {
private:
int64_t showId;
STableMetaRsp* pMeta;
STableMetaRsp metaRsp;
SRetrieveTableRsp* pRetrieveRsp;
char* pData;
int32_t pos;
......
......@@ -56,9 +56,16 @@ void Testbase::Init(const char* path, int16_t port) {
server.Start(path, fqdn, port, firstEp);
client.Init("root", "taosdata", fqdn, port);
taosMsleep(1100);
tFreeSTableMetaRsp(&metaRsp);
showId = 0;
pData = 0;
pos = 0;
pRetrieveRsp = NULL;
}
void Testbase::Cleanup() {
tFreeSTableMetaRsp(&metaRsp);
server.Stop();
client.Cleanup();
dndCleanup();
......@@ -85,51 +92,43 @@ void Testbase::SendShowMetaReq(int8_t showType, const char* db) {
strcpy(showReq.db, db);
int32_t contLen = tSerializeSShowReq(NULL, 0, &showReq);
char* pReq = (char*)rpcMallocCont(contLen);
void* pReq = rpcMallocCont(contLen);
tSerializeSShowReq(pReq, contLen, &showReq);
tFreeSShowReq(&showReq);
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW, pReq, contLen);
SShowRsp* pShowRsp = (SShowRsp*)pRsp->pCont;
ASSERT(pShowRsp != nullptr);
pShowRsp->showId = htobe64(pShowRsp->showId);
pMeta = &pShowRsp->tableMeta;
pMeta->numOfTags = htonl(pMeta->numOfTags);
pMeta->numOfColumns = htonl(pMeta->numOfColumns);
pMeta->sversion = htonl(pMeta->sversion);
pMeta->tversion = htonl(pMeta->tversion);
pMeta->tuid = htobe64(pMeta->tuid);
pMeta->suid = htobe64(pMeta->suid);
SRpcMsg* pRsp = SendReq(TDMT_MND_SHOW, pReq, contLen);
ASSERT(pRsp->pCont != nullptr);
showId = pShowRsp->showId;
SShowRsp showRsp = {0};
tDeserializeSShowRsp(pRsp->pCont, pRsp->contLen, &showRsp);
tFreeSTableMetaRsp(&metaRsp);
metaRsp = showRsp.tableMeta;
showId = showRsp.showId;
}
int32_t Testbase::GetMetaColId(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->colId = htonl(pSchema->colId);
SSchema* pSchema = &metaRsp.pSchemas[index];
return pSchema->colId;
}
int8_t Testbase::GetMetaType(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
SSchema* pSchema = &metaRsp.pSchemas[index];
return pSchema->type;
}
int32_t Testbase::GetMetaBytes(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
pSchema->bytes = htonl(pSchema->bytes);
SSchema* pSchema = &metaRsp.pSchemas[index];
return pSchema->bytes;
}
const char* Testbase::GetMetaName(int32_t index) {
SSchema* pSchema = &pMeta->pSchema[index];
SSchema* pSchema = &metaRsp.pSchemas[index];
return pSchema->name;
}
int32_t Testbase::GetMetaNum() { return pMeta->numOfColumns; }
int32_t Testbase::GetMetaNum() { return metaRsp.numOfColumns; }
const char* Testbase::GetMetaTbName() { return pMeta->tbName; }
const char* Testbase::GetMetaTbName() { return metaRsp.tbName; }
void Testbase::SendShowRetrieveReq() {
SRetrieveTableReq retrieveReq = {0};
......@@ -150,7 +149,7 @@ void Testbase::SendShowRetrieveReq() {
pos = 0;
}
const char* Testbase::GetShowName() { return pMeta->tbName; }
const char* Testbase::GetShowName() { return metaRsp.tbName; }
int8_t Testbase::GetShowInt8() {
int8_t data = *((int8_t*)(pData + pos));
......@@ -191,6 +190,6 @@ const char* Testbase::GetShowBinary(int32_t len) {
int32_t Testbase::GetShowRows() { return pRetrieveRsp->numOfRows; }
STableMetaRsp* Testbase::GetShowMeta() { return pMeta; }
STableMetaRsp* Testbase::GetShowMeta() { return &metaRsp; }
SRetrieveTableRsp* Testbase::GetRetrieveRsp() { return pRetrieveRsp; }
\ No newline at end of file
......@@ -36,6 +36,9 @@ int32_t mndCheckCreateDbAuth(SUserObj *pOperUser);
int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckWriteAuth(SUserObj *pOperUser, SDbObj *pDb);
int32_t mndCheckReadAuth(SUserObj *pOperUser, SDbObj *pDb);
#ifdef __cplusplus
}
#endif
......
......@@ -24,7 +24,7 @@ extern "C" {
int32_t mndInitDb(SMnode *pMnode);
void mndCleanupDb(SMnode *pMnode);
SDbObj *mndAcquireDb(SMnode *pMnode, char *db);
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
......
......@@ -22,14 +22,13 @@
extern "C" {
#endif
int32_t mndInitStb(SMnode *pMnode);
void mndCleanupStb(SMnode *pMnode);
int32_t mndInitStb(SMnode *pMnode);
void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen);
#ifdef __cplusplus
}
......
......@@ -141,3 +141,29 @@ int32_t mndCheckAlterDropCompactSyncDbAuth(SUserObj *pOperUser, SDbObj *pDb) {
}
int32_t mndCheckUseDbAuth(SUserObj *pOperUser, SDbObj *pDb) { return 0; }
int32_t mndCheckWriteAuth(SUserObj *pOperUser, SDbObj *pDb) {
if (pOperUser->superUser || strcmp(pOperUser->user, pDb->createUser) == 0) {
return 0;
}
if (taosHashGet(pOperUser->writeDbs, pDb->name, strlen(pDb->name) + 1) != NULL) {
return 0;
}
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
int32_t mndCheckReadAuth(SUserObj *pOperUser, SDbObj *pDb) {
if (pOperUser->superUser || strcmp(pOperUser->user, pDb->createUser) == 0) {
return 0;
}
if (taosHashGet(pOperUser->readDbs, pDb->name, strlen(pDb->name) + 1) != NULL) {
return 0;
}
terrno = TSDB_CODE_MND_NO_RIGHTS;
return -1;
}
\ No newline at end of file
......@@ -433,27 +433,27 @@ static int32_t mndGetBnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -165,27 +165,27 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
static int32_t mndGetClusterMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
strcpy(pMeta->tbName, mndShowStr(pShow->type));
pShow->numOfColumns = cols;
......
......@@ -14,8 +14,8 @@
*/
#define _DEFAULT_SOURCE
#include "mndConsumer.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
......@@ -53,13 +53,14 @@ int32_t mndInitConsumer(SMnode *pMnode) {
void mndCleanupConsumer(SMnode *pMnode) {}
SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
SMqConsumerObj* pConsumer = malloc(sizeof(SMqConsumerObj));
SMqConsumerObj *mndCreateConsumer(int64_t consumerId, const char *cgroup) {
SMqConsumerObj *pConsumer = calloc(1, sizeof(SMqConsumerObj));
if (pConsumer == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pConsumer->recentRemovedTopics = taosArrayInit(0, sizeof(char*));
pConsumer->recentRemovedTopics = taosArrayInit(1, sizeof(char *));
pConsumer->epoch = 1;
pConsumer->consumerId = consumerId;
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__INIT);
......@@ -70,7 +71,8 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
void *buf = NULL;
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
......@@ -105,7 +107,7 @@ CM_ENCODE_OVER:
SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
void* buf = NULL;
void *buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
......
......@@ -18,6 +18,7 @@
#include "mndAuth.h"
#include "mndDnode.h"
#include "mndShow.h"
#include "mndStb.h"
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
......@@ -194,7 +195,7 @@ static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew) {
return 0;
}
SDbObj *mndAcquireDb(SMnode *pMnode, char *db) {
SDbObj *mndAcquireDb(SMnode *pMnode, const char *db) {
SSdb *pSdb = pMnode->pSdb;
SDbObj *pDb = sdbAcquire(pSdb, SDB_DB, db);
if (pDb == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
......@@ -722,6 +723,24 @@ static int32_t mndSetDropDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD
sdbRelease(pSdb, pVgroup);
}
while (1) {
SStbObj *pStb = NULL;
pIter = sdbFetch(pSdb, SDB_STB, pIter, (void **)&pStb);
if (pIter == NULL) break;
if (pStb->dbUid == pDb->uid) {
SSdbRaw *pStbRaw = mndStbActionEncode(pStb);
if (pStbRaw == NULL || mndTransAppendCommitlog(pTrans, pStbRaw) != 0) {
sdbCancelFetch(pSdb, pIter);
sdbRelease(pSdb, pStbRaw);
return -1;
}
sdbSetRawStatus(pStbRaw, SDB_STATUS_DROPPED);
}
sdbRelease(pSdb, pStb);
}
return 0;
}
......@@ -1111,117 +1130,117 @@ static int32_t mndGetDbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMe
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = (TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vgroups");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "ntables");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "replica");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "quorum");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "days");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "keep0,keep1,keep2");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "cache");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "blocks");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "minrows");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "maxrows");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "wallevel");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "fsync");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "comp");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "cachelast");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 3 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "precision");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_TINYINT;
strcpy(pSchema[cols].name, "update");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -628,21 +628,21 @@ static int32_t mndProcessConfigDnodeRsp(SMnodeMsg *pRsp) {
static int32_t mndGetConfigMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta) {
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = TSDB_CONFIG_OPTION_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
tstrncpy(pSchema[cols].name, "name", sizeof(pSchema[cols].name));
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_CONIIG_VALUE_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
tstrncpy(pSchema[cols].name, "value", sizeof(pSchema[cols].name));
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......@@ -705,51 +705,51 @@ static int32_t mndGetDnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vnodes");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "support_vnodes");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "status");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 24 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "offline_reason");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -468,51 +468,51 @@ static int32_t mndGetFuncMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = TSDB_FUNC_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = PATH_MAX + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "comment");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "aggregate");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_TYPE_STR_MAX_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "outputtype");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "code_len");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "bufsize");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -243,7 +243,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
pEpSet->inUse = pEpSet->numOfEps;
}
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, htons(pObj->pDnode->port));
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj);
}
}
......@@ -620,39 +620,39 @@ static int32_t mndGetMnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "role");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "role_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -184,15 +184,17 @@ static void mndCancelGetNextConn(SMnode *pMnode, void *pIter) {
}
static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SUserObj *pUser = NULL;
SDbObj *pDb = NULL;
SConnObj *pConn = NULL;
int32_t code = -1;
SConnectReq *pConnReq = pReq->rpcMsg.pCont;
pConnReq->pid = htonl(pConnReq->pid);
pConnReq->startTime = htobe64(pConnReq->startTime);
SMnode *pMnode = pReq->pMnode;
SUserObj *pUser = NULL;
SDbObj *pDb = NULL;
SConnObj *pConn = NULL;
int32_t code = -1;
SConnectReq connReq = {0};
if (tDeserializeSConnectReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &connReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CONN_OVER;
}
SRpcConnInfo info = {0};
if (rpcGetConnInfo(pReq->rpcMsg.handle, &info) != 0) {
......@@ -209,41 +211,42 @@ static int32_t mndProcessConnectReq(SMnodeMsg *pReq) {
goto CONN_OVER;
}
if (pConnReq->db[0]) {
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, pConnReq->db);
if (connReq.db[0]) {
snprintf(pReq->db, TSDB_DB_FNAME_LEN, "%d%s%s", pUser->acctId, TS_PATH_DELIMITER, connReq.db);
pDb = mndAcquireDb(pMnode, pReq->db);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_INVALID_DB;
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, pConnReq->db, terrstr());
mError("user:%s, failed to login from %s while use db:%s since %s", pReq->user, ip, connReq.db, terrstr());
goto CONN_OVER;
}
}
pConn = mndCreateConn(pMnode, &info, pConnReq->pid, pConnReq->app, pConnReq->startTime);
pConn = mndCreateConn(pMnode, &info, connReq.pid, connReq.app, connReq.startTime);
if (pConn == NULL) {
mError("user:%s, failed to login from %s while create connection since %s", pReq->user, ip, terrstr());
goto CONN_OVER;
}
SConnectRsp *pRsp = rpcMallocCont(sizeof(SConnectRsp));
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("user:%s, failed to login from %s while create rsp since %s", pReq->user, ip, terrstr());
goto CONN_OVER;
}
SConnectRsp connectRsp = {0};
connectRsp.acctId = pUser->acctId;
connectRsp.superUser = pUser->superUser;
connectRsp.clusterId = pMnode->clusterId;
connectRsp.connId = pConn->id;
pRsp->acctId = htonl(pUser->acctId);
pRsp->superUser = pUser->superUser;
pRsp->clusterId = htobe64(pMnode->clusterId);
pRsp->connId = htonl(pConn->id);
snprintf(connectRsp.sVersion, sizeof(connectRsp.sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo,
gitinfo);
mndGetMnodeEpSet(pMnode, &connectRsp.epSet);
snprintf(pRsp->sVersion, tListLen(pRsp->sVersion), "ver:%s\nbuild:%s\ngitinfo:%s", version, buildinfo, gitinfo);
mndGetMnodeEpSet(pMnode, &pRsp->epSet);
int32_t contLen = tSerializeSConnectRsp(NULL, 0, &connectRsp);
if (contLen < 0) goto CONN_OVER;
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) goto CONN_OVER;
tSerializeSConnectRsp(pRsp, contLen, &connectRsp);
pReq->contLen = sizeof(SConnectRsp);
pReq->contLen = contLen;
pReq->pCont = pRsp;
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, pConnReq->app);
mDebug("user:%s, login from %s, conn:%d, app:%s", info.user, ip, pConn->id, connReq.app);
code = 0;
......@@ -578,53 +581,53 @@ static int32_t mndGetConnsMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
mndReleaseUser(pMnode, pUser);
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "connId");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
// app name
pShow->bytes[cols] = TSDB_APP_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "program");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
// app pid
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "pid");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "login_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "last_access");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......@@ -707,93 +710,93 @@ static int32_t mndGetQueryMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
mndReleaseUser(pMnode, pUser);
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "queryId");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "connId");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "user");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip:port");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 22 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "qid");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "created_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_BIGINT;
strcpy(pSchema[cols].name, "time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = QUERY_OBJ_ID_SIZE + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sql_obj_id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "pid");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ep");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 1;
pSchema[cols].type = TSDB_DATA_TYPE_BOOL;
strcpy(pSchema[cols].name, "stable_query");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "sub_queries");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sub_query_info");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "sql");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -433,27 +433,27 @@ static int32_t mndGetQnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -120,6 +120,7 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
SShowMgmt *pMgmt = &pMnode->showMgmt;
int32_t code = -1;
SShowReq showReq = {0};
SShowRsp showRsp = {0};
if (tDeserializeSShowReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &showReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
......@@ -142,25 +143,26 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
goto SHOW_OVER;
}
int32_t size = sizeof(SShowRsp) + sizeof(SSchema) * TSDB_MAX_COLUMNS + TSDB_EXTRA_PAYLOAD_SIZE;
SShowRsp *pRsp = rpcMallocCont(size);
if (pRsp == NULL) {
showRsp.showId = pShow->id;
showRsp.tableMeta.pSchemas = calloc(TSDB_MAX_COLUMNS, sizeof(SSchema));
if (showRsp.tableMeta.pSchemas == NULL) {
mndReleaseShowObj(pShow, true);
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto SHOW_OVER;
}
code = (*metaFp)(pReq, pShow, &pRsp->tableMeta);
code = (*metaFp)(pReq, pShow, &showRsp.tableMeta);
mDebug("show:0x%" PRIx64 ", get meta finished, numOfRows:%d cols:%d showReq.type:%s, result:%s", pShow->id,
pShow->numOfRows, pShow->numOfColumns, mndShowStr(showReq.type), tstrerror(code));
if (code == TSDB_CODE_SUCCESS) {
pReq->contLen = sizeof(SShowRsp) + sizeof(SSchema) * pShow->numOfColumns;
pReq->pCont = pRsp;
pRsp->showId = htobe64(pShow->id);
if (code == 0) {
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
void *pBuf = rpcMallocCont(bufLen);
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
pReq->contLen = bufLen;
pReq->pCont = pBuf;
mndReleaseShowObj(pShow, false);
} else {
rpcFreeCont(pRsp);
mndReleaseShowObj(pShow, true);
}
......@@ -170,6 +172,7 @@ SHOW_OVER:
}
tFreeSShowReq(&showReq);
tFreeSShowRsp(&showRsp);
return code;
}
......
......@@ -436,27 +436,27 @@ static int32_t mndGetSnodeMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "id");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "endpoint");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "mndStb.h"
#include "mndAuth.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
......@@ -27,7 +28,6 @@
#define TSDB_STB_VER_NUMBER 1
#define TSDB_STB_RESERVE_SIZE 64
static SSdbRaw *mndStbActionEncode(SStbObj *pStb);
static SSdbRow *mndStbActionDecode(SSdbRaw *pRaw);
static int32_t mndStbActionInsert(SSdb *pSdb, SStbObj *pStb);
static int32_t mndStbActionDelete(SSdb *pSdb, SStbObj *pStb);
......@@ -69,7 +69,7 @@ int32_t mndInitStb(SMnode *pMnode) {
void mndCleanupStb(SMnode *pMnode) {}
static SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
SSdbRaw *mndStbActionEncode(SStbObj *pStb) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SStbObj) + (pStb->numOfColumns + pStb->numOfTags) * sizeof(SSchema) + TSDB_STB_RESERVE_SIZE;
......@@ -343,7 +343,7 @@ static int32_t mndCheckCreateStbReq(SMCreateStbReq *pCreate) {
return -1;
}
SField *pField = taosArrayGet(pCreate->pColumns, 0) ;
SField *pField = taosArrayGet(pCreate->pColumns, 0);
if (pField->type != TSDB_DATA_TYPE_TIMESTAMP) {
terrno = TSDB_CODE_MND_INVALID_STB_OPTION;
return -1;
......@@ -549,12 +549,19 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
SStbObj *pTopicStb = NULL;
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SUserObj *pUser = NULL;
SMCreateStbReq createReq = {0};
if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, &createReq) == NULL) goto CREATE_STB_OVER;
if (tDeserializeSMCreateStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_STB_OVER;
}
mDebug("stb:%s, start to create", createReq.name);
if (mndCheckCreateStbReq(&createReq) != 0) goto CREATE_STB_OVER;
if (mndCheckCreateStbReq(&createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_STB_OVER;
}
pStb = mndAcquireStb(pMnode, createReq.name);
if (pStb != NULL) {
......@@ -582,6 +589,15 @@ static int32_t mndProcessMCreateStbReq(SMnodeMsg *pReq) {
goto CREATE_STB_OVER;
}
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto CREATE_STB_OVER;
}
if (mndCheckWriteAuth(pUser, pDb) != 0) {
goto CREATE_STB_OVER;
}
code = mndCreateStb(pMnode, pReq, &createReq, pDb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
......@@ -593,8 +609,8 @@ CREATE_STB_OVER:
mndReleaseStb(pMnode, pStb);
mndReleaseStb(pMnode, pTopicStb);
mndReleaseDb(pMnode, pDb);
taosArrayDestroy(createReq.pColumns);
taosArrayDestroy(createReq.pTags);
mndReleaseUser(pMnode, pUser);
tFreeSMCreateStbReq(&createReq);
return code;
}
......@@ -965,7 +981,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SMnodeMsg *pReq, const SMAltertbReq *
int32_t code = -1;
STrans *pTrans = NULL;
SField *pField0 = taosArrayGet(pAlter->pFields, 0);
switch (pAlter->alterType) {
case TSDB_ALTER_TABLE_ADD_TAG:
code = mndAddSuperTableTag(pOld, &stbObj, pAlter->pFields, pAlter->numOfFields);
......@@ -1020,9 +1036,13 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
int32_t code = -1;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SUserObj *pUser = NULL;
SMAltertbReq alterReq = {0};
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, &alterReq) == NULL) goto ALTER_STB_OVER;
if (tDeserializeSMAlterStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto ALTER_STB_OVER;
}
mDebug("stb:%s, start to alter", alterReq.name);
if (mndCheckAlterStbReq(&alterReq) != 0) goto ALTER_STB_OVER;
......@@ -1039,6 +1059,15 @@ static int32_t mndProcessMAlterStbReq(SMnodeMsg *pReq) {
goto ALTER_STB_OVER;
}
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto ALTER_STB_OVER;
}
if (mndCheckWriteAuth(pUser, pDb) != 0) {
goto ALTER_STB_OVER;
}
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
......@@ -1049,6 +1078,7 @@ ALTER_STB_OVER:
mndReleaseStb(pMnode, pStb);
mndReleaseDb(pMnode, pDb);
mndReleaseUser(pMnode, pUser);
taosArrayDestroy(alterReq.pFields);
return code;
......@@ -1135,43 +1165,60 @@ DROP_STB_OVER:
}
static int32_t mndProcessMDropStbReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
SUserObj *pUser = NULL;
SDbObj *pDb = NULL;
SStbObj *pStb = NULL;
SMDropStbReq dropReq = {0};
tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, &dropReq);
if (tDeserializeSMDropStbReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_STB_OVER;
}
mDebug("stb:%s, start to drop", dropReq.name);
SStbObj *pStb = mndAcquireStb(pMnode, dropReq.name);
pStb = mndAcquireStb(pMnode, dropReq.name);
if (pStb == NULL) {
if (dropReq.igNotExists) {
mDebug("stb:%s, not exist, ignore not exist is set", dropReq.name);
return 0;
code = 0;
goto DROP_STB_OVER;
} else {
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
goto DROP_STB_OVER;
}
}
SDbObj *pDb = mndAcquireDbByStb(pMnode, dropReq.name);
pDb = mndAcquireDbByStb(pMnode, dropReq.name);
if (pDb == NULL) {
mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
goto DROP_STB_OVER;
}
int32_t code = mndDropStb(pMnode, pReq, pDb, pStb);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) {
goto DROP_STB_OVER;
}
if (code != 0) {
if (mndCheckWriteAuth(pUser, pDb) != 0) {
goto DROP_STB_OVER;
}
code = mndDropStb(pMnode, pReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_STB_OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("stb:%s, failed to drop since %s", dropReq.name, terrstr());
return -1;
}
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
mndReleaseUser(pMnode, pUser);
return code;
}
static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) {
......@@ -1179,195 +1226,163 @@ static int32_t mndProcessVDropStbRsp(SMnodeMsg *pRsp) {
return 0;
}
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
STableInfoReq *pInfo = pReq->rpcMsg.pCont;
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", pInfo->dbFName, pInfo->tbName);
mDebug("stb:%s, start to retrieve meta", tbFName);
SDbObj *pDb = mndAcquireDb(pMnode, pInfo->dbFName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
mError("stb:%s, failed to retrieve meta since %s", tbFName, terrstr());
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
return -1;
}
static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbName, STableMetaRsp *pRsp) {
taosRLockLatch(&pStb->lock);
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema);
STableMetaRsp *pMeta = rpcMallocCont(contLen);
if (pMeta == NULL) {
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
pRsp->pSchemas = calloc(totalCols, sizeof(SSchema));
if (pRsp->pSchemas == NULL) {
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("stb:%s, failed to get meta since %s", tbFName, terrstr());
return -1;
}
strcpy(pMeta->dbFName, pStb->db);
strcpy(pMeta->tbName, pInfo->tbName);
strcpy(pMeta->stbName, pInfo->tbName);
pMeta->dbId = htobe64(pDb->uid);
pMeta->numOfTags = htonl(pStb->numOfTags);
pMeta->numOfColumns = htonl(pStb->numOfColumns);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = TSDB_SUPER_TABLE;
pMeta->update = pDb->cfg.update;
pMeta->sversion = htonl(pStb->version);
pMeta->suid = htobe64(pStb->uid);
pMeta->tuid = htobe64(pStb->uid);
strcpy(pRsp->dbFName, pStb->db);
strcpy(pRsp->tbName, tbName);
strcpy(pRsp->stbName, tbName);
pRsp->dbId = pDb->uid;
pRsp->numOfTags = pStb->numOfTags;
pRsp->numOfColumns = pStb->numOfColumns;
pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->update = pDb->cfg.update;
pRsp->sversion = pStb->version;
pRsp->suid = pStb->uid;
pRsp->tuid = pStb->uid;
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pMeta->pSchema[i];
SSchema *pSchema = &pRsp->pSchemas[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
pSchema->colId = pSrcSchema->colId;
pSchema->bytes = pSrcSchema->bytes;
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pMeta->pSchema[i + pStb->numOfColumns];
SSchema *pSchema = &pRsp->pSchemas[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
pSchema->colId = pSrcSchema->colId;
pSchema->bytes = pSrcSchema->bytes;
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
pReq->pCont = pMeta;
pReq->contLen = contLen;
mDebug("stb:%s, meta is retrieved, cols:%d tags:%d", tbFName, pStb->numOfColumns, pStb->numOfTags);
return 0;
}
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) {
SSdb *pSdb = pMnode->pSdb;
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
void *buf = malloc(bufSize);
int32_t len = 0;
int32_t contLen = 0;
STableMetaRsp *pRsp = NULL;
for (int32_t i = 0; i < num; ++i) {
SSTableMetaVersion *stb = &stbs[i];
stb->suid = be64toh(stb->suid);
stb->sversion = ntohs(stb->sversion);
stb->tversion = ntohs(stb->tversion);
if ((contLen + sizeof(STableMetaRsp)) > bufSize) {
bufSize = contLen + (num - i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
pRsp = (STableMetaRsp *)((char *)buf + contLen);
SDbObj *pDb = mndAcquireDb(pMnode, dbFName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
strcpy(pRsp->dbFName, stb->dbFName);
strcpy(pRsp->tbName, stb->stbName);
strcpy(pRsp->stbName, stb->stbName);
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_INVALID_STB;
return -1;
}
mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName);
int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
return code;
}
SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName);
if (pDb == NULL) {
pRsp->numOfColumns = -1;
pRsp->suid = htobe64(stb->suid);
contLen += sizeof(STableMetaRsp);
mWarn("db:%s, failed to require db since %s", stb->dbFName, terrstr());
continue;
}
static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode;
int32_t code = -1;
STableInfoReq infoReq = {0};
STableMetaRsp metaRsp = {0};
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName);
if (tDeserializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto RETRIEVE_META_OVER;
}
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
pRsp->numOfColumns = -1;
pRsp->suid = htobe64(stb->suid);
contLen += sizeof(STableMetaRsp);
mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr());
continue;
}
mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) {
goto RETRIEVE_META_OVER;
}
taosRLockLatch(&pStb->lock);
int32_t rspLen = tSerializeSTableMetaRsp(NULL, 0, &metaRsp);
if (rspLen < 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto RETRIEVE_META_OVER;
}
if (stb->suid == pStb->uid && stb->sversion == pStb->version) {
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
continue;
}
void *pRsp = rpcMallocCont(rspLen);
if (pRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto RETRIEVE_META_OVER;
}
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
int32_t len = totalCols * sizeof(SSchema);
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
pReq->pCont = pRsp;
pReq->contLen = rspLen;
code = 0;
contLen += sizeof(STableMetaRsp) + len;
mDebug("stb:%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName);
if (contLen > bufSize) {
bufSize = contLen + (num - i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
buf = realloc(buf, bufSize);
}
RETRIEVE_META_OVER:
if (code != 0) {
mError("stb:%s.%s, failed to retrieve meta since %s", infoReq.dbFName, infoReq.tbName, terrstr());
}
pRsp->numOfTags = htonl(pStb->numOfTags);
pRsp->numOfColumns = htonl(pStb->numOfColumns);
pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->update = pDb->cfg.update;
pRsp->sversion = htonl(pStb->version);
pRsp->suid = htobe64(pStb->uid);
pRsp->tuid = htobe64(pStb->uid);
for (int32_t i = 0; i < pStb->numOfColumns; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
SSchema *pSrcSchema = &pStb->pColumns[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
tFreeSTableMetaRsp(&metaRsp);
return code;
}
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen) {
STableMetaBatchRsp batchMetaRsp = {0};
batchMetaRsp.pArray = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
if (batchMetaRsp.pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfStbs; ++i) {
SSTableMetaVersion *pStbVersion = &pStbVersions[i];
pStbVersion->suid = be64toh(pStbVersion->suid);
pStbVersion->sversion = ntohs(pStbVersion->sversion);
pStbVersion->tversion = ntohs(pStbVersion->tversion);
STableMetaRsp metaRsp = {0};
mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
metaRsp.numOfColumns = -1;
metaRsp.suid = pStbVersion->suid;
}
for (int32_t i = 0; i < pStb->numOfTags; ++i) {
SSchema *pSchema = &pRsp->pSchema[i + pStb->numOfColumns];
SSchema *pSrcSchema = &pStb->pTags[i];
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
pSchema->type = pSrcSchema->type;
pSchema->colId = htonl(pSrcSchema->colId);
pSchema->bytes = htonl(pSrcSchema->bytes);
if (pStbVersion->sversion != metaRsp.sversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp);
}
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
int32_t rspLen = tSerializeSTableMetaBatchRsp(NULL, 0, &batchMetaRsp);
if (rspLen < 0) {
tFreeSTableMetaBatchRsp(&batchMetaRsp);
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
if (contLen > 0) {
*rsp = buf;
*rspLen = contLen;
} else {
*rsp = NULL;
tfree(buf);
*rspLen = 0;
void *pRsp = malloc(rspLen);
if (pRsp == NULL) {
tFreeSTableMetaBatchRsp(&batchMetaRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
tSerializeSTableMetaBatchRsp(pRsp, rspLen, &batchMetaRsp);
*ppRsp = pRsp;
*pRspLen = rspLen;
return 0;
}
......@@ -1407,33 +1422,33 @@ static int32_t mndGetStbMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *pM
}
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "columns");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "tags");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......@@ -1482,7 +1497,7 @@ static int32_t mndRetrieveStb(SMnodeMsg *pReq, SShowObj *pShow, char *data, int3
if (pShow->pIter == NULL) break;
if (pStb->dbUid != pDb->uid) {
if (strncmp(pStb->db, pDb->name, tListLen(pStb->db)) == 0) {
if (strncmp(pStb->db, pDb->name, prefixLen) == 0) {
mError("Inconsistent table data, name:%s, db:%s, dbUid:%" PRIu64, pStb->name, pDb->name, pDb->uid);
}
......
......@@ -302,7 +302,10 @@ static int32_t mndProcessCreateUserReq(SMnodeMsg *pReq) {
SUserObj *pOperUser = NULL;
SCreateUserReq createReq = {0};
if (tDeserializeSCreateUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) goto CREATE_USER_OVER;
if (tDeserializeSCreateUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_USER_OVER;
}
mDebug("user:%s, start to create", createReq.user);
......@@ -402,7 +405,10 @@ static int32_t mndProcessAlterUserReq(SMnodeMsg *pReq) {
SUserObj newUser = {0};
SAlterUserReq alterReq = {0};
if (tDeserializeSAlterUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) goto ALTER_USER_OVER;
if (tDeserializeSAlterUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &alterReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto ALTER_USER_OVER;
}
mDebug("user:%s, start to alter", alterReq.user);
......@@ -537,7 +543,10 @@ static int32_t mndProcessDropUserReq(SMnodeMsg *pReq) {
SUserObj *pOperUser = NULL;
SDropUserReq dropReq = {0};
if (tDeserializeSDropUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) goto DROP_USER_OVER;
if (tDeserializeSDropUserReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto DROP_USER_OVER;
}
mDebug("user:%s, start to drop", dropReq.user);
......@@ -583,7 +592,10 @@ static int32_t mndProcessGetUserAuthReq(SMnodeMsg *pReq) {
SGetUserAuthReq authReq = {0};
SGetUserAuthRsp authRsp = {0};
if (tDeserializeSGetUserAuthReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &authReq) != 0) goto GET_AUTH_OVER;
if (tDeserializeSGetUserAuthReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &authReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
goto GET_AUTH_OVER;
}
mTrace("user:%s, start to get auth", authReq.user);
......@@ -640,33 +652,33 @@ static int32_t mndGetUserMeta(SMnodeMsg *pReq, SShowObj *pShow, STableMetaRsp *p
SSdb *pSdb = pMnode->pSdb;
int32_t cols = 0;
SSchema *pSchema = pMeta->pSchema;
SSchema *pSchema = pMeta->pSchemas;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "name");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 10 + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "privilege");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = 8;
pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP;
strcpy(pSchema[cols].name, "create_time");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pShow->bytes[cols] = TSDB_USER_LEN + VARSTR_HEADER_SIZE;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "account");
pSchema[cols].bytes = htonl(pShow->bytes[cols]);
pSchema[cols].bytes = pShow->bytes[cols];
cols++;
pMeta->numOfColumns = htonl(cols);
pMeta->numOfColumns = cols;
pShow->numOfColumns = cols;
pShow->offset[0] = 0;
......
......@@ -54,12 +54,14 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
}
TEST_F(MndTestShow, 03_ShowMsg_Conn) {
int32_t contLen = sizeof(SConnectReq);
SConnectReq connectReq = {0};
connectReq.pid = 1234;
strcpy(connectReq.app, "mnode_test_show");
strcpy(connectReq.db, "");
SConnectReq* pReq = (SConnectReq*)rpcMallocCont(contLen);
pReq->pid = htonl(1234);
strcpy(pReq->app, "mnode_test_show");
strcpy(pReq->db, "");
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connectReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connectReq);
SRpcMsg* pRsp = test.SendReq(TDMT_MND_CONNECT, pReq, contLen);
ASSERT_NE(pRsp, nullptr);
......
此差异已折叠。
......@@ -1440,7 +1440,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
SCtgUpdateTblMsg *msg = NULL;
STableMetaOutput moutput = {0};
STableMetaOutput *output = malloc(sizeof(STableMetaOutput));
STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
if (NULL == output) {
ctgError("malloc %d failed", (int32_t)sizeof(STableMetaOutput));
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册