提交 b2e5a364 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode_refact1

......@@ -79,11 +79,11 @@ typedef enum {
} ETsdbSmaType;
typedef enum {
TSDB_RSMA_RETENTION_0 = 0,
TSDB_RSMA_RETENTION_1 = 1,
TSDB_RSMA_RETENTION_2 = 2,
TSDB_RSMA_RETENTION_MAX = 3
} ERSmaRetention;
TSDB_RETENTION_L0 = 0,
TSDB_RETENTION_L1 = 1,
TSDB_RETENTION_L2 = 2,
TSDB_RETENTION_MAX = 3
} ERetentionLevel;
extern char *qtypeStr[];
......
......@@ -225,6 +225,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
void blockDebugShowData(const SArray* dataBlocks);
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
tb_uid_t uid, tb_uid_t suid);
static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock);
}
......
......@@ -202,6 +202,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp)
// sync integration
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_TIMEOUT, "vnode-sync-timeout", NULL, NULL)
......
......@@ -187,17 +187,17 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
int32_t i = 0;
uint8_t* start = (uint8_t*)&pColumnInfoData->nullbitmap[BitmapLen(numOfRow1)];
int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1);
while (i < len) { // size limit of pSource->nullbitmap
int32_t overCount = BitmapLen(total) - BitmapLen(numOfRow1);
while (i < len) { // size limit of pSource->nullbitmap
if (i >= 1) {
start[i - 1] |= (p[i] >> remindBits); //copy remind bits
start[i - 1] |= (p[i] >> remindBits); // copy remind bits
}
if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap
if (i >= overCount) { // size limit of pColumnInfoData->nullbitmap
return;
}
start[i] |= (p[i] << shiftBits); //copy shift bits
start[i] |= (p[i] << shiftBits); // copy shift bits
i += 1;
}
}
......@@ -453,7 +453,6 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd
// all fit in
*stopIndex = numOfRows - 1;
return TSDB_CODE_SUCCESS;
}
SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) {
......@@ -558,7 +557,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) {
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
size_t metaSize = pBlock->info.rows * sizeof(int32_t);
char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small
char* tmp = taosMemoryRealloc(pCol->varmeta.offset, metaSize); // preview calloc is too small
if (tmp == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
......@@ -940,8 +939,9 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
p3 - p2, p4 - p3, rows);
uDebug("blockDataSort complex sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64
", rows:%d\n",
p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
destroyTupleIndex(index);
return TSDB_CODE_SUCCESS;
......@@ -1178,7 +1178,7 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
}
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
if(pDataBlock == NULL){
if (pDataBlock == NULL) {
return NULL;
}
......@@ -1189,7 +1189,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
pBlock->info.numOfCols = numOfCols;
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
pBlock->info.rowSize = pDataBlock->info.rows;
pBlock->info.rowSize = pDataBlock->info.rows;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData colInfo = {0};
......@@ -1219,7 +1219,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
}
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
return (int32_t) ((pageSize - blockDataGetSerialMetaSize(pBlock))/ blockDataGetSerialRowSize(pBlock));
return (int32_t)((pageSize - blockDataGetSerialMetaSize(pBlock)) / blockDataGetSerialRowSize(pBlock));
}
void colDataDestroy(SColumnInfoData* pColData) {
......@@ -1236,14 +1236,14 @@ static void doShiftBitmap(char* nullBitmap, size_t n, size_t total) {
int32_t len = BitmapLen(total);
int32_t newLen = BitmapLen(total - n);
if (n%8 == 0) {
memmove(nullBitmap, nullBitmap + n/8, newLen);
if (n % 8 == 0) {
memmove(nullBitmap, nullBitmap + n / 8, newLen);
} else {
int32_t tail = n % 8;
int32_t i = 0;
uint8_t* p = (uint8_t*) nullBitmap;
while(i < len) {
uint8_t* p = (uint8_t*)nullBitmap;
while (i < len) {
uint8_t v = p[i];
p[i] = 0;
......@@ -1270,7 +1270,7 @@ static void colDataTrimFirstNRows(SColumnInfoData* pColInfoData, size_t n, size_
}
}
int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) {
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n) {
if (n == 0) {
return TSDB_CODE_SUCCESS;
}
......@@ -1278,7 +1278,7 @@ int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n) {
if (pBlock->info.rows <= n) {
blockDataCleanup(pBlock);
} else {
for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
colDataTrimFirstNRows(pColInfoData, n, pBlock->info.rows);
}
......@@ -1464,3 +1464,128 @@ void blockDebugShowData(const SArray* dataBlocks) {
}
}
/**
* @brief TODO: Assume that the final generated result it less than 3M
*
* @param pReq
* @param pDataBlocks
* @param vgId
* @param uid set as parameter temporarily // TODO: remove this parameter, and the executor should set uid in
* SDataBlock->info.uid
* @param suid // TODO: check with Liao whether suid response is reasonable
*
* TODO: colId should be set
*/
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid,
tb_uid_t suid) {
int32_t sz = taosArrayGetSize(pDataBlocks);
int32_t bufSize = sizeof(SSubmitReq);
for (int32_t i = 0; i < sz; ++i) {
SDataBlockInfo* pBlkInfo = &((SSDataBlock*)taosArrayGet(pDataBlocks, i))->info;
bufSize += pBlkInfo->rows * (TD_ROW_HEAD_LEN + pBlkInfo->rowSize + BitmapLen(pBlkInfo->numOfCols));
bufSize += sizeof(SSubmitBlk);
}
ASSERT(bufSize < 3 * 1024 * 1024);
*pReq = taosMemoryCalloc(1, bufSize);
if(!(*pReq)) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
void* pDataBuf = *pReq;
int32_t msgLen = sizeof(SSubmitReq);
int32_t numOfBlks = 0;
SRowBuilder rb = {0};
tdSRowInit(&rb, 0); // TODO: use the latest version
for (int32_t i = 0; i < sz; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pDataBlocks, i);
int32_t colNum = pDataBlock->info.numOfCols;
int32_t rows = pDataBlock->info.rows;
int32_t rowSize = pDataBlock->info.rowSize;
int64_t groupId = pDataBlock->info.groupId;
if(rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
}
SSubmitBlk* pSubmitBlk = POINTER_SHIFT(pDataBuf, msgLen);
pSubmitBlk->suid = suid;
pSubmitBlk->uid = uid;
pSubmitBlk->numOfRows = rows;
++numOfBlks;
msgLen += sizeof(SSubmitBlk);
int32_t dataLen = 0;
for (int32_t j = 0; j < rows; ++j) { // iterate by row
tdSRowResetBuf(&rb, POINTER_SHIFT(pDataBuf, msgLen)); // set row buf
printf("|");
bool isStartKey = false;
for (int32_t k = 0; k < colNum; ++k) { // iterate by column
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
if (!isStartKey) {
isStartKey = true;
tdAppendColValToRow(&rb, PRIMARYKEY_TIMESTAMP_COL_ID, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 0, 0);
} else {
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_TIMESTAMP, TD_VTYPE_NORM, var, true, 8, k);
break;
}
break;
case TSDB_DATA_TYPE_NCHAR: {
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_NCHAR, TD_VTYPE_NORM, var, true, 8, k);
break;
}
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
tdAppendColValToRow(&rb, 2, TSDB_DATA_TYPE_VARCHAR, TD_VTYPE_NORM, var, true, 8, k);
break;
}
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_MEDIUMBLOB:
printf("the column type %" PRIi16 " is defined but not implemented yet\n", pColInfoData->info.type);
TASSERT(0);
break;
default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
tdAppendColValToRow(&rb, 2, pColInfoData->info.type, TD_VTYPE_NORM, var, true, 8, k);
} else {
printf("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0);
}
break;
}
}
dataLen += TD_ROW_LEN(rb.pBuf);
}
pSubmitBlk->dataLen = dataLen;
msgLen += pSubmitBlk->dataLen;
}
(*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId);
(*pReq)->header.contLen = htonl(msgLen);
(*pReq)->length = (*pReq)->header.contLen;
(*pReq)->numOfBlocks = htonl(numOfBlks);
SSubmitBlk* blk = (SSubmitBlk*)((*pReq) + 1);
while (numOfBlks--) {
int32_t dataLen = blk->dataLen;
blk->uid = htobe64(blk->uid);
blk->suid = htobe64(blk->suid);
blk->padding = htonl(blk->padding);
blk->sversion = htonl(blk->sversion);
blk->dataLen = htonl(blk->dataLen);
blk->schemaLen = htonl(blk->schemaLen);
blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen);
}
return TSDB_CODE_SUCCESS;
}
......@@ -284,6 +284,7 @@ void vmInitMsgHandle(SMgmtWrapper *pWrapper) {
dmSetMsgHandle(pWrapper, TDMT_VND_CREATE_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CANCEL_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_DROP_SMA, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_SUBMIT_RSMA, vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_MQ_VG_CHANGE, (NodeMsgFp)vmProcessWriteMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, DEFAULT_HANDLE);
dmSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, DEFAULT_HANDLE);
......
......@@ -37,7 +37,7 @@
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_VNODE_SMA_DEBUG // TODO: evaluate to remove the macro and the relative codes
// vnode
typedef struct SVnode SVnode;
typedef struct STsdbCfg STsdbCfg; // todo: remove
......@@ -145,7 +145,7 @@ struct STsdbCfg {
int32_t keep2;
// TODO: save to tsdb cfg file
int8_t type; // ETsdbType
SRetention retentions[TSDB_RSMA_RETENTION_MAX];
SRetention retentions[TSDB_RETENTION_MAX];
};
typedef enum {
......
......@@ -72,6 +72,7 @@ struct STsdb {
char *path;
SVnode *pVnode;
bool repoLocked;
int8_t level; // retention level
TdThreadMutex mutex;
STsdbMemTable *mem;
STsdbMemTable *imem;
......
......@@ -124,6 +124,7 @@ int32_t tsdbUpdateTbUidList(STsdb* pTsdb, STbUidStore* pUidStore);
void tsdbUidStoreDestory(STbUidStore* pStore);
void* tsdbUidStoreFree(STbUidStore* pStore);
int32_t tsdbTriggerRSma(STsdb* pTsdb, void* pMsg, int32_t inputType);
int32_t tsdbProcessSubmitReq(STsdb* pTsdb, int64_t version, void* pReq);
typedef struct {
int8_t streamType; // sma or other
......@@ -181,6 +182,7 @@ struct SVnode {
struct STbUidStore {
tb_uid_t suid;
tb_uid_t uid; // TODO: just for debugging, remove when uid provided in SSDataBlock
SArray* tbUids;
SHashObj* uidHash;
};
......
......@@ -27,7 +27,13 @@ static const char *TSDB_FNAME_SUFFIX[] = {
"rsma", // TSDB_FILE_RSMA
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname);
static const char *TSDB_LEVEL_DNAME[] = {
"tsdb",
"rsma1",
"rsma2",
};
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char* dname, char *fname);
// static int tsdbRollBackMFile(SMFile *pMFile);
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
......@@ -45,7 +51,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
pDFile->info.fver = tsdbGetDFSVersion(ftype);
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, fname);
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_LEVEL_DNAME[pRepo->level], fname);
tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname);
}
......@@ -431,14 +437,15 @@ int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype
return 0;
}
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) {
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname) {
ASSERT(ftype != TSDB_FILE_MAX);
if (ftype < TSDB_FILE_MAX) {
if (ver == 0) {
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s", vid, dname, vid, fid,
TSDB_FNAME_SUFFIX[ftype]);
} else {
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data/v%df%d.%s-ver%" PRIu32, vid, vid, fid,
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s-ver%" PRIu32, vid, dname, vid, fid,
TSDB_FNAME_SUFFIX[ftype], ver);
}
} else {
......
......@@ -15,21 +15,21 @@
#include "tsdb.h"
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir);
static int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level);
int tsdbOpen(SVnode *pVnode, int8_t type) {
switch (type) {
case TSDB_TYPE_TSDB:
return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR);
return tsdbOpenImpl(pVnode, type, &VND_TSDB(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
case TSDB_TYPE_TSMA:
ASSERT(0);
break;
case TSDB_TYPE_RSMA_L0:
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR);
return tsdbOpenImpl(pVnode, type, &VND_RSMA0(pVnode), VNODE_TSDB_DIR, TSDB_RETENTION_L0);
case TSDB_TYPE_RSMA_L1:
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR);
return tsdbOpenImpl(pVnode, type, &VND_RSMA1(pVnode), VNODE_RSMA1_DIR, TSDB_RETENTION_L1);
case TSDB_TYPE_RSMA_L2:
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR);
return tsdbOpenImpl(pVnode, type, &VND_RSMA2(pVnode), VNODE_RSMA2_DIR, TSDB_RETENTION_L2);
default:
ASSERT(0);
break;
......@@ -37,7 +37,17 @@ int tsdbOpen(SVnode *pVnode, int8_t type) {
return 0;
}
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) {
/**
* @brief
*
* @param pVnode
* @param type
* @param ppTsdb
* @param dir
* @param level retention level
* @return int
*/
int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir, int8_t level) {
STsdb *pTsdb = NULL;
int slen = 0;
......@@ -55,6 +65,7 @@ int tsdbOpenImpl(SVnode *pVnode, int8_t type, STsdb **ppTsdb, const char *dir) {
sprintf(pTsdb->path, "%s%s%s%s%s", tfsGetPrimaryPath(pVnode->pTfs), TD_DIRSEP, pVnode->path, TD_DIRSEP,
dir);
pTsdb->pVnode = pVnode;
pTsdb->level = level;
pTsdb->repoLocked = false;
taosThreadMutexInit(&pTsdb->mutex, NULL);
pTsdb->fs = tsdbNewFS(REPO_CFG(pTsdb));
......@@ -79,6 +90,7 @@ _err:
int tsdbClose(STsdb *pTsdb) {
if (pTsdb) {
// TODO: destroy mem/imem
tsdbCloseFS(pTsdb);
tsdbFreeFS(pTsdb->fs);
taosMemoryFree(pTsdb);
......
......@@ -105,7 +105,7 @@ typedef struct {
#define RSMA_TASK_INFO_HASH_SLOT 8
struct SRSmaInfo {
void *taskInfo[TSDB_RSMA_RETENTION_2]; // qTaskInfo_t
void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t
};
struct SSmaStat {
......@@ -127,7 +127,7 @@ static FORCE_INLINE void tsdbFreeTaskHandle(qTaskInfo_t *taskHandle) {
}
static FORCE_INLINE void *tsdbFreeRSmaInfo(SRSmaInfo *pInfo) {
for (int32_t i = 0; i < TSDB_RSMA_RETENTION_MAX; ++i) {
for (int32_t i = 0; i < TSDB_RETENTION_MAX; ++i) {
if (pInfo->taskInfo[i]) {
tsdbFreeTaskHandle(pInfo->taskInfo[i]);
}
......@@ -175,7 +175,8 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg);
static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids);
static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType,
qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention);
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
int8_t level);
// mgmt interface
static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid);
......@@ -1976,6 +1977,7 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
if (!pBlock) break;
tsdbUidStorePut(pStore, msgIter.suid, NULL);
pStore->uid = msgIter.uid; // TODO: remove, just for debugging
}
if (terrno != TSDB_CODE_SUCCESS) return -1;
......@@ -1983,13 +1985,15 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) {
}
static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType,
qTaskInfo_t *taskInfo, tb_uid_t suid, int8_t retention) {
qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid,
int8_t level) {
SArray *pResult = NULL;
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), retention, taskInfo,
tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo,
suid);
qSetStreamInput(taskInfo, pMsg, inputType);
while (1) {
SSDataBlock *output;
SSDataBlock *output = NULL;
uint64_t ts;
if (qExecTask(taskInfo, &output, &ts) < 0) {
ASSERT(false);
......@@ -2010,22 +2014,36 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg,
if (taosArrayGetSize(pResult) > 0) {
blockDebugShowData(pResult);
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2);
SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode), uid, suid) != 0) {
taosArrayDestroy(pResult);
return TSDB_CODE_FAILED;
}
if (tsdbProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) {
taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED;
}
taosMemoryFreeClear(pReq);
} else {
tsdbWarn("vgId:%d no rsma_1 data generated since %s", REPO_ID(pTsdb), tstrerror(terrno));
tsdbWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", REPO_ID(pTsdb), level, tstrerror(terrno));
}
taosArrayDestroy(pResult);
return TSDB_CODE_SUCCESS;
}
int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_uid_t suid) {
static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_uid_t suid, tb_uid_t uid) {
SSmaEnv *pEnv = REPO_RSMA_ENV(pTsdb);
if (!pEnv) {
// only applicable when rsma env exists
return TSDB_CODE_SUCCESS;
}
ASSERT(uid != 0); // TODO: remove later
SSmaStat *pStat = SMA_ENV_STAT(pEnv);
SRSmaInfo *pRSmaInfo = NULL;
......@@ -2037,8 +2055,11 @@ int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType, tb_ui
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], suid, TSDB_RSMA_RETENTION_1);
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], suid, TSDB_RSMA_RETENTION_2);
// TODO: use the proper schema instead of 0, and cache STSchema in cache
STSchema *pTSchema = metaGetTbTSchema(pTsdb->pVnode->pMeta, suid, 0);
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[0], pTSchema, suid, uid, TSDB_RETENTION_L1);
tsdbExecuteRSmaImpl(pTsdb, pMsg, inputType, pRSmaInfo->taskInfo[1], pTSchema, suid, uid, TSDB_RETENTION_L2);
taosMemoryFree(pTSchema);
}
return TSDB_CODE_SUCCESS;
......@@ -2056,12 +2077,12 @@ int32_t tsdbTriggerRSma(STsdb *pTsdb, void *pMsg, int32_t inputType) {
tsdbFetchSubmitReqSuids(pMsg, &uidStore);
if (uidStore.suid != 0) {
tsdbExecuteRSma(pTsdb, pMsg, inputType, uidStore.suid);
tsdbExecuteRSma(pTsdb, pMsg, inputType, uidStore.suid, uidStore.uid);
void *pIter = taosHashIterate(uidStore.uidHash, NULL);
while (pIter) {
tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL);
tsdbExecuteRSma(pTsdb, pMsg, inputType, *pTbSuid);
tsdbExecuteRSma(pTsdb, pMsg, inputType, *pTbSuid, 0);
pIter = taosHashIterate(uidStore.uidHash, pIter);
}
......
......@@ -66,7 +66,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
#ifdef TSDB_VNODE_SMA_DEBUG
if (pCfg->tsdbCfg.retentions[0].freq > 0) {
int32_t nRetention = 1;
if (pCfg->tsdbCfg.retentions[1].freq > 0) {
......@@ -87,7 +86,6 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
tjsonAddItemToArray(pNodeRetentions, pNodeRetention);
}
}
#endif
if (tjsonAddIntegerToObject(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
......@@ -135,11 +133,11 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if (tjsonGetNumberValue(pJson, "keep0", pCfg->tsdbCfg.keep0) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep1", pCfg->tsdbCfg.keep1) < 0) return -1;
if (tjsonGetNumberValue(pJson, "keep2", pCfg->tsdbCfg.keep2) < 0) return -1;
#ifdef TSDB_VNODE_SMA_DEBUG
SJson *pNodeRetentions = tjsonGetObjectItem(pJson, "retentions");
int nRetention = tjsonGetArraySize(pNodeRetentions);
ASSERT(nRetention <= TSDB_RSMA_RETENTION_MAX);
int32_t nRetention = tjsonGetArraySize(pNodeRetentions);
if (nRetention > TSDB_RETENTION_MAX) {
nRetention = TSDB_RETENTION_MAX;
}
for (int32_t i = 0; i < nRetention; ++i) {
SJson *pNodeRetention = tjsonGetArrayItem(pNodeRetentions, i);
ASSERT(pNodeRetention != NULL);
......@@ -148,7 +146,6 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
tjsonGetNumberValue(pNodeRetention, "keep", (pCfg->tsdbCfg.retentions)[i].keep);
tjsonGetNumberValue(pNodeRetention, "keepUnit", (pCfg->tsdbCfg.retentions)[i].keepUnit);
}
#endif
if (tjsonGetNumberValue(pJson, "wal.vgId", pCfg->walCfg.vgId) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.fsyncPeriod", pCfg->walCfg.fsyncPeriod) < 0) return -1;
if (tjsonGetNumberValue(pJson, "wal.retentionPeriod", pCfg->walCfg.retentionPeriod) < 0) return -1;
......
......@@ -47,9 +47,26 @@ int vnodeBegin(SVnode *pVnode) {
}
// begin tsdb
if (tsdbBegin(pVnode->pTsdb) < 0) {
vError("vgId:%d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
if (vnodeIsRollup(pVnode)) {
if (tsdbBegin(VND_RSMA0(pVnode)) < 0) {
vError("vgId:%d failed to begin rsma0 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbBegin(VND_RSMA1(pVnode)) < 0) {
vError("vgId:%d failed to begin rsma1 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
if (tsdbBegin(VND_RSMA2(pVnode)) < 0) {
vError("vgId:%d failed to begin rsma2 since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
} else {
if (tsdbBegin(pVnode->pTsdb) < 0) {
vError("vgId:%d failed to begin tsdb since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}
}
return 0;
......
......@@ -461,7 +461,7 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
SSubmitRsp rsp = {0};
pRsp->code = 0;
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// handle the request
if (tsdbInsertData(pVnode->pTsdb, version, pSubmitReq, &rsp) < 0) {
pRsp->code = terrno;
......@@ -470,12 +470,28 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, in
// pRsp->msgType = TDMT_VND_SUBMIT_RSP;
// vnodeProcessSubmitReq(pVnode, ptr, pRsp);
// tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
// encode the response (TODO)
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
pRsp->contLen = sizeof(SSubmitRsp);
tsdbTriggerRSma(pVnode->pTsdb, pReq, STREAM_DATA_TYPE_SUBMIT_BLOCK);
return 0;
}
int32_t tsdbProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) {
if(!pReq) {
terrno = TSDB_CODE_INVALID_PTR;
return TSDB_CODE_FAILED;
}
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) {
return TSDB_CODE_FAILED;
}
return TSDB_CODE_SUCCESS;
}
......@@ -8,7 +8,7 @@ add_library(executor STATIC ${EXECUTOR_SRC})
# )
target_link_libraries(executor
PRIVATE os util common function parser planner qcom vnode scalar nodes
PRIVATE os util common function parser planner qcom vnode scalar nodes index
)
target_include_directories(
......@@ -19,4 +19,4 @@ target_include_directories(
#if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
#endif(${BUILD_TEST})
\ No newline at end of file
#endif(${BUILD_TEST})
......@@ -15,7 +15,9 @@
#include "indexoperator.h"
#include "executorimpl.h"
#include "index.h"
#include "nodes.h"
#include "tdatablock.h"
typedef struct SIFCtx {
int32_t code;
......@@ -48,11 +50,19 @@ typedef struct SIFCtx {
} while (0)
typedef struct SIFParam {
SArray * result;
SHashObj *pFilter;
SArray *result;
char * condValue;
col_id_t colId;
int64_t suid; // add later
char dbName[TSDB_DB_NAME_LEN];
char colName[TSDB_COL_NAME_LEN];
} SIFParam;
typedef int32_t (*sif_func_t)(SNode *left, SNode *rigth, SIFParam *output);
typedef int32_t (*sif_func_t)(SIFParam *left, SIFParam *rigth, SIFParam *output);
// construct tag filter operator later
static void destroyTagFilterOperatorInfo(void *param) {
STagFilterOperatorInfo *pInfo = (STagFilterOperatorInfo *)param;
......@@ -60,7 +70,10 @@ static void destroyTagFilterOperatorInfo(void *param) {
static void sifFreeParam(SIFParam *param) {
if (param == NULL) return;
taosArrayDestroy(param->result);
taosMemoryFree(param->condValue);
taosHashCleanup(param->pFilter);
}
static int32_t sifGetOperParamNum(EOperatorType ty) {
......@@ -71,15 +84,70 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
}
return 2;
}
static int32_t sifValidateColumn(SColumnNode *cn) {
// add more check
if (cn == NULL) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
if (cn->colType != COLUMN_TYPE_TAG) {
return TSDB_CODE_QRY_INVALID_INPUT;
}
return TSDB_CODE_SUCCESS;
}
static int32_t sifGetValueFromNode(SNode *node, char **value) {
// covert data From snode;
SValueNode *vn = (SValueNode *)node;
char * pData = nodesGetValueFromNode(vn);
SDataType *pType = &vn->node.resType;
int32_t type = pType->type;
int32_t valLen = 0;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData);
if (type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
} else if (*pData == TSDB_DATA_TYPE_BIGINT || *pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = LONG_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
}
dataLen += CHAR_BYTES;
}
valLen = dataLen;
} else {
valLen = pType->bytes;
}
char *tv = taosMemoryCalloc(1, valLen + 1);
if (tv == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
memcpy(tv, pData, valLen);
*value = tv;
return TSDB_CODE_SUCCESS;
}
static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
switch (nodeType(node)) {
case QUERY_NODE_VALUE: {
SValueNode *vn = (SValueNode *)node;
SIF_ERR_RET(sifGetValueFromNode(node, &param->condValue));
param->colId = -1;
break;
}
case QUERY_NODE_COLUMN: {
SColumnNode *cn = (SColumnNode *)node;
/*only support tag column*/
SIF_ERR_RET(sifValidateColumn(cn));
param->colId = cn->colId;
memcpy(param->dbName, cn->dbName, sizeof(cn->dbName));
memcpy(param->colName, cn->colName, sizeof(cn->colName));
break;
}
......@@ -89,7 +157,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
qError("invalid length for node:%p, length: %d", node, LIST_LENGTH(nl->pNodeList));
SIF_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
SIF_ERR_RET(scalarGenerateSetFromList((void **)&param->pFilter, node, nl->dataType.type));
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
taosHashCleanup(param->pFilter);
qError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
......@@ -163,58 +231,63 @@ static int32_t sifExecFunction(SFunctionNode *node, SIFCtx *ctx, SIFParam *outpu
qError("index-filter not support buildin function");
return TSDB_CODE_QRY_INVALID_INPUT;
}
static int32_t sifLessThanFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
static int32_t sifIndex(SIFParam *left, SIFParam *right, int8_t operType, SIFParam *output) {
SIndexMultiTermQuery *mq = indexMultiTermQueryCreate(MUST);
return TSDB_CODE_SUCCESS;
}
static int32_t sifLessEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifLessThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_LOWER_THAN;
return sifIndex(left, right, id, output);
}
static int32_t sifGreaterThanFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifLessEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_LOWER_EQUAL;
return sifIndex(left, right, id, output);
}
static int32_t sifGreaterEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifGreaterThanFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_GREATER_THAN;
return sifIndex(left, right, id, output);
}
static int32_t sifGreaterEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_GREATER_EQUAL;
return sifIndex(left, right, id, output);
}
static int32_t sifEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_EQUAL;
return sifIndex(left, right, id, output);
}
static int32_t sifNotEqualFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifNotEqualFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NOT_EQUAL;
return sifIndex(left, right, id, output);
}
static int32_t sifInFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_IN;
return sifIndex(left, right, id, output);
}
static int32_t sifNotInFunc(SNode *left, SNode *right, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifNotInFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NOT_IN;
return sifIndex(left, right, id, output);
}
static int32_t sifLikeFunc(SNode *left, SNode *right, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_LIKE;
return sifIndex(left, right, id, output);
}
static int32_t sifNotLikeFunc(SNode *left, SNode *right, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifNotLikeFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NOT_LIKE;
return sifIndex(left, right, id, output);
}
static int32_t sifMatchFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_MATCH;
return sifIndex(left, right, id, output);
}
static int32_t sifNotMatchFunc(SNode *left, SNode *rigth, SIFParam *output) {
// impl later
return TSDB_CODE_SUCCESS;
static int32_t sifNotMatchFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
int id = OP_TYPE_NMATCH;
return sifIndex(left, right, id, output);
}
static int32_t sifDefaultFunc(SNode *left, SNode *rigth, SIFParam *output) {
static int32_t sifDefaultFunc(SIFParam *left, SIFParam *right, SIFParam *output) {
// add more except
return TSDB_CODE_QRY_INVALID_INPUT;
}
......@@ -252,17 +325,18 @@ static sif_func_t sifGetOperFn(int32_t funcId) {
return sifDefaultFunc;
}
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
int32_t code = 0;
SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
int32_t code = 0;
int32_t nParam = sifGetOperParamNum(node->opType);
if (nParam <= 1) {
SIF_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
sif_func_t operFn = sifGetOperFn(node->opType);
return operFn(node->pLeft, node->pRight, output);
return operFn(&params[0], nParam > 1 ? &params[1] : NULL, output);
_return:
taosMemoryFree(params);
SIF_RET(code);
......@@ -335,7 +409,6 @@ static EDealRes sifWalkOper(SNode *pNode, void *context) {
if (ctx->code) {
return DEAL_RES_ERROR;
}
if (taosHashPut(ctx->pRes, &pNode, POINTER_BYTES, &output, sizeof(output))) {
ctx->code = TSDB_CODE_QRY_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
......
......@@ -258,13 +258,13 @@ void indexOptsDestroy(SIndexOpts* opts) {
*
*/
SIndexMultiTermQuery* indexMultiTermQueryCreate(EIndexOperatorType opera) {
SIndexMultiTermQuery* p = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
if (p == NULL) {
SIndexMultiTermQuery* mtq = (SIndexMultiTermQuery*)taosMemoryMalloc(sizeof(SIndexMultiTermQuery));
if (mtq == NULL) {
return NULL;
}
p->opera = opera;
p->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return p;
mtq->opera = opera;
mtq->query = taosArrayInit(4, sizeof(SIndexTermQuery));
return mtq;
}
void indexMultiTermQueryDestroy(SIndexMultiTermQuery* pQuery) {
for (int i = 0; i < taosArrayGetSize(pQuery->query); i++) {
......@@ -282,23 +282,24 @@ int indexMultiTermQueryAdd(SIndexMultiTermQuery* pQuery, SIndexTerm* term, EInde
SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colType, const char* colName,
int32_t nColName, const char* colVal, int32_t nColVal) {
SIndexTerm* t = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
if (t == NULL) {
SIndexTerm* tm = (SIndexTerm*)taosMemoryCalloc(1, (sizeof(SIndexTerm)));
if (tm == NULL) {
return NULL;
}
t->suid = suid;
t->operType = oper;
t->colType = colType;
tm->suid = suid;
tm->operType = oper;
tm->colType = colType;
t->colName = (char*)taosMemoryCalloc(1, nColName + 1);
memcpy(t->colName, colName, nColName);
t->nColName = nColName;
tm->colName = (char*)taosMemoryCalloc(1, nColName + 1);
memcpy(tm->colName, colName, nColName);
tm->nColName = nColName;
t->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
memcpy(t->colVal, colVal, nColVal);
t->nColVal = nColVal;
return t;
tm->colVal = (char*)taosMemoryCalloc(1, nColVal + 1);
memcpy(tm->colVal, colVal, nColVal);
tm->nColVal = nColVal;
return tm;
}
void indexTermDestroy(SIndexTerm* p) {
taosMemoryFree(p->colName);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册