提交 0f801699 编写于 作者: H Hongze Cheng

more code

上级 167c8051
......@@ -103,6 +103,7 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
ASSERT(((SColVal *)aColVal->pData)[0].type == TSDB_DATA_TYPE_TIMESTAMP);
// scan ---------------
SRow *pRow = NULL;
uint8_t flag = 0;
int32_t iColVal = 1;
const int32_t nColVal = taosArrayGetSize(aColVal);
......@@ -196,9 +197,11 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
}
// alloc --------------
code = tRealloc((uint8_t **)ppRow, nRow);
if (code) return code;
SRow *pRow = *ppRow;
pRow = taosMemoryMalloc(nRow);
if (NULL == pRow) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
// build --------------
pColVal = (SColVal *)taosArrayGet(aColVal, 0);
......@@ -349,6 +352,12 @@ int32_t tRowBuild(SArray *aColVal, STSchema *pTSchema, SRow **ppRow) {
}
_exit:
if (code) {
*ppRow = NULL;
tRowDestroy(pRow);
} else {
*ppRow = pRow;
}
return code;
}
......@@ -490,7 +499,9 @@ void tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) {
}
}
void tRowDestroy(SRow *pRow) { tFree((uint8_t *)pRow); }
void tRowDestroy(SRow *pRow) {
if (pRow) taosMemoryFree(pRow);
}
static int32_t tRowPCmprFn(const void *p1, const void *p2) {
if ((*(SRow **)p1)->ts < (*(SRow **)p2)->ts) {
......
......@@ -6665,7 +6665,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
if (tEncodeI64v(pCoder, taosArrayGetSize(pSubmitTbData->aRowP)) < 0) return -1;
for (int32_t i = 0; i < taosArrayGetSize(pSubmitTbData->aRowP); i++) {
SRow *pRow = taosArrayGetP(pSubmitTbData->aRowP, i);
memcpy(pCoder->data + pCoder->pos, pRow, pRow->len);
if (pCoder->data) memcpy(pCoder->data + pCoder->pos, pRow, pRow->len);
pCoder->pos += pRow->len;
}
}
......@@ -6713,10 +6713,8 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
for (int32_t i = 0; i < nRows; i++) {
SRow **ppRow = taosArrayReserve(pSubmitTbData->aRowP, 1);
if (tDecodeBinary(pCoder, (uint8_t **)ppRow, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
*ppRow = (SRow *)(pCoder->data + pCoder->pos);
pCoder->pos += (*ppRow)->len;
}
}
......@@ -6822,13 +6820,18 @@ _exit:
if (code) {
*ppReq = NULL;
if (pReq) {
// todo: do other clear
if (pReq->aCreateTbReq) {
taosArrayDestroy(pReq->aCreateTbReq);
}
if (pReq->aSubmitTbData) {
taosArrayDestroy(pReq->aSubmitTbData);
}
taosMemoryFree(pReq);
}
} else {
*ppReq = pReq;
}
return 0;
return code;
}
void tDestroySSubmitTbData(SSubmitTbData *pTbData) {
......
......@@ -121,7 +121,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
int32_t tRowMergerInit(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema);
void tRowMergerClear(SRowMerger *pMerger);
int32_t tRowMerge(SRowMerger *pMerger, TSDBROW *pRow);
int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow);
int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow);
// TABLEID
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
// TSDBKEY
......@@ -336,7 +336,7 @@ typedef struct SMemSkipListNode SMemSkipListNode;
struct SMemSkipListNode {
int8_t level;
int64_t version;
STSRow *pTSRow;
SRow *pTSRow;
SMemSkipListNode *forwards[0];
};
typedef struct SMemSkipList {
......@@ -380,7 +380,7 @@ struct TSDBROW {
union {
struct {
int64_t version;
STSRow *pTSRow;
SRow *pTSRow;
};
struct {
SBlockData *pBlockData;
......@@ -739,8 +739,8 @@ typedef struct {
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, SRow *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, SRow *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
int32_t tsdbCacheRelease(SLRUCache *pCache, LRUHandle *h);
......@@ -752,7 +752,7 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
void tsdbCacheSetCapacity(SVnode *pVnode, size_t capacity);
size_t tsdbCacheGetCapacity(SVnode *pVnode);
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pSchema);
// ========== inline functions ==========
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
......
......@@ -160,8 +160,7 @@ int32_t tsdbRollbackCommit(STsdb* pTsdb);
int32_t tsdbDoRetention(STsdb* pTsdb, int64_t now);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitMsgIter* pMsgIter, SSubmitBlk* pBlock,
SSubmitBlkRsp* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, SSubmitBlkRsp* pRsp);
int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey);
int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
......
......@@ -190,7 +190,7 @@ int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey) {
return code;
}
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, STSRow *row, bool dup) {
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, SRow *row, bool dup) {
int32_t code = 0;
STSRow *cacheRow = NULL;
char key[32] = {0};
......@@ -222,7 +222,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
SColVal *tColVal = &tTsVal1->colVal;
SColVal colVal = {0};
tTSRowGetVal(row, pTSchema, iCol, &colVal);
tRowGet(row, pTSchema, iCol, &colVal);
if (!COL_VAL_IS_NONE(&colVal)) {
if (keyTs == tTsVal1->ts && !COL_VAL_IS_NONE(tColVal)) {
invalidate = true;
......@@ -316,7 +316,7 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, ST
return code;
}
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb *pTsdb) {
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, SRow *row, STsdb *pTsdb) {
int32_t code = 0;
STSRow *cacheRow = NULL;
char key[32] = {0};
......@@ -348,7 +348,7 @@ int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, STSRow *row, STsdb
SColVal *tColVal = &tTsVal1->colVal;
SColVal colVal = {0};
tTSRowGetVal(row, pTSchema, iCol, &colVal);
tRowGet(row, pTSchema, iCol, &colVal);
if (!COL_VAL_IS_VALUE(&colVal)) {
if (keyTs == tTsVal1->ts && COL_VAL_IS_VALUE(tColVal)) {
invalidate = true;
......@@ -1456,29 +1456,29 @@ int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *
return code;
}
int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) {
int32_t code = 0;
int16_t nCol = taosArrayGetSize(pLastArray);
SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
// int32_t tsdbCacheLastArray2Row(SArray *pLastArray, STSRow **ppRow, STSchema *pTSchema) {
// int32_t code = 0;
// int16_t nCol = taosArrayGetSize(pLastArray);
// SArray *pColArray = taosArrayInit(nCol, sizeof(SColVal));
for (int16_t iCol = 0; iCol < nCol; ++iCol) {
SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol);
SColVal *tColVal = &tTsVal->colVal;
taosArrayPush(pColArray, tColVal);
}
// for (int16_t iCol = 0; iCol < nCol; ++iCol) {
// SLastCol *tTsVal = (SLastCol *)taosArrayGet(pLastArray, iCol);
// SColVal *tColVal = &tTsVal->colVal;
// taosArrayPush(pColArray, tColVal);
// }
code = tdSTSRowNew(pColArray, pTSchema, ppRow);
if (code) goto _err;
// code = tdSTSRowNew(pColArray, pTSchema, ppRow);
// if (code) goto _err;
taosArrayDestroy(pColArray);
// taosArrayDestroy(pColArray);
return code;
// return code;
_err:
taosArrayDestroy(pColArray);
// _err:
// taosArrayDestroy(pColArray);
return code;
}
// return code;
// }
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) {
int32_t code = 0;
......
......@@ -29,7 +29,7 @@
static void tbDataMovePosTo(STbData *pTbData, SMemSkipListNode **pos, TSDBKEY *pKey, int32_t flags);
static int32_t tsdbGetOrCreateTbData(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid, STbData **ppTbData);
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,
SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp);
SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp);
int32_t tsdbMemTableCreate(STsdb *pTsdb, SMemTable **ppMemTable) {
int32_t code = 0;
......@@ -95,13 +95,12 @@ STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t
return pTbData;
}
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock,
SSubmitBlkRsp *pRsp) {
int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) {
int32_t code = 0;
SMemTable *pMemTable = pTsdb->mem;
STbData *pTbData = NULL;
tb_uid_t suid = pMsgIter->suid;
tb_uid_t uid = pMsgIter->uid;
tb_uid_t suid = pSubmitTbData->suid;
tb_uid_t uid = pSubmitTbData->uid;
SMetaInfo info;
code = metaGetInfo(pTsdb->pVnode->pMeta, uid, &info, NULL);
......@@ -116,14 +115,14 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
if (info.suid) {
metaGetInfo(pTsdb->pVnode->pMeta, info.suid, &info, NULL);
}
if (pMsgIter->sversion != info.skmVer) {
if (pSubmitTbData->sver != info.skmVer) {
tsdbError("vgId:%d, req sver:%d, skmVer:%d suid:%" PRId64 " uid:%" PRId64, TD_VID(pTsdb->pVnode),
pMsgIter->sversion, info.skmVer, suid, uid);
pSubmitTbData->sver, info.skmVer, suid, uid);
code = TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER;
goto _err;
}
pRsp->sver = info.skmVer;
if (pRsp) pRsp->sver = info.skmVer;
// create/get STbData to op
code = tsdbGetOrCreateTbData(pMemTable, suid, uid, &pTbData);
......@@ -132,7 +131,7 @@ int32_t tsdbInsertTableData(STsdb *pTsdb, int64_t version, SSubmitMsgIter *pMsgI
}
// do insert impl
code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pMsgIter, pBlock, pRsp);
code = tsdbInsertTableDataImpl(pMemTable, pTbData, version, pSubmitTbData, pRsp);
if (code) {
goto _err;
}
......@@ -468,8 +467,8 @@ static FORCE_INLINE int8_t tsdbMemSkipListRandLevel(SMemSkipList *pSl) {
return level;
}
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version,
STSRow *pRow, int8_t forward) {
static int32_t tbDataDoPut(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode **pos, int64_t version, SRow *pRow,
int8_t forward) {
int32_t code = 0;
int8_t level;
SMemSkipListNode *pNode;
......@@ -538,23 +537,21 @@ _exit:
}
static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, int64_t version,
SSubmitMsgIter *pMsgIter, SSubmitBlk *pBlock, SSubmitBlkRsp *pRsp) {
SSubmitTbData *pSubmitTbData, SSubmitBlkRsp *pRsp) {
int32_t code = 0;
SSubmitBlkIter blkIter = {0};
// SSubmitBlkIter blkIter = {0};
TSDBKEY key = {.version = version};
SMemSkipListNode *pos[SL_MAX_LEVEL];
TSDBROW row = tsdbRowFromTSRow(version, NULL);
int32_t nRow = 0;
STSRow *pLastRow = NULL;
tInitSubmitBlkIter(pMsgIter, pBlock, &blkIter);
int32_t nRow = taosArrayGetSize(pSubmitTbData->aRowP);
int32_t iRow = 0;
SRow *pLastRow = NULL;
// backward put first data
row.pTSRow = tGetSubmitBlkNext(&blkIter);
if (row.pTSRow == NULL) return code;
row.pTSRow = taosArrayGetP(pSubmitTbData->aRowP, iRow);
key.ts = row.pTSRow->ts;
nRow++;
iRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD);
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 0);
if (code) {
......@@ -566,17 +563,19 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
pLastRow = row.pTSRow;
// forward put rest data
row.pTSRow = tGetSubmitBlkNext(&blkIter);
if (row.pTSRow) {
if (iRow < nRow) {
for (int8_t iLevel = pos[0]->level; iLevel < pTbData->sl.maxLevel; iLevel++) {
pos[iLevel] = SL_NODE_BACKWARD(pos[iLevel], iLevel);
}
do {
while (iRow < nRow) {
row.pTSRow = taosArrayGetP(pSubmitTbData->aRowP, iRow);
key.ts = row.pTSRow->ts;
nRow++;
if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
}
code = tbDataDoPut(pMemTable, pTbData, pos, version, row.pTSRow, 1);
if (code) {
goto _err;
......@@ -584,8 +583,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
pLastRow = row.pTSRow;
row.pTSRow = tGetSubmitBlkNext(&blkIter);
} while (row.pTSRow);
iRow++;
}
}
if (key.ts >= pTbData->maxKey) {
......@@ -607,8 +606,8 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
pMemTable->maxKey = TMAX(pMemTable->maxKey, pTbData->maxKey);
pMemTable->nRow += nRow;
pRsp->numOfRows = nRow;
pRsp->affectedRows = nRow;
if (pRsp) pRsp->numOfRows = nRow;
if (pRsp) pRsp->affectedRows = nRow;
return code;
......
......@@ -186,7 +186,7 @@ static int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STabl
SRowMerger* pMerger, SVersionRange* pVerRange);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, SRowMerger* pMerger,
STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
STableBlockScanInfo* pInfo);
static int32_t doAppendRowFromFileBlock(SSDataBlock* pResBlock, STsdbReader* pReader, SBlockData* pBlockData,
int32_t rowIndex);
......@@ -194,10 +194,10 @@ static void setComposedBlockFlag(STsdbReader* pReader, bool composed);
static bool hasBeenDropped(const SArray* pDelList, int32_t* index, TSDBKEY* pKey, int32_t order,
SVersionRange* pVerRange);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList,
STSRow** pTSRow, STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow);
static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo,
STsdbReader* pReader, STSRow** pTSRow);
STsdbReader* pReader, SRow** pTSRow);
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader);
......@@ -242,7 +242,7 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SSDataBlock* pB
static void updateBlockSMAInfo(STSchema* pSchema, SBlockLoadSuppInfo* pSupInfo) {
int32_t i = 0, j = 0;
while(i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
while (i < pSchema->numOfCols && j < pSupInfo->numOfCols) {
STColumn* pTCol = &pSchema->columns[i];
if (pTCol->colId == pSupInfo->colIds[j]) {
if (!IS_BSMA_ON(pTCol)) {
......@@ -305,7 +305,8 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) {
}
// NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) {
static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList,
int32_t numOfTables) {
// allocate buffer in order to load data blocks from file
// todo use simple hash instead, optimize the memory consumption
SHashObj* pTableMap =
......@@ -754,7 +755,6 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr);
pReader->cost.numOfBlocks += total;
pReader->cost.headFileLoadTime += el;
......@@ -960,7 +960,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
p = pData->pData + tDataTypes[pData->type].bytes * startIndex;
}
int32_t step = asc? 1:-1;
int32_t step = asc ? 1 : -1;
// make sure it is aligned to 8bit
ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0);
......@@ -970,12 +970,11 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
// 2. reverse the array list in case of descending order scan data block
if (!asc) {
switch(pColData->info.type) {
switch (pColData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
{
case TSDB_DATA_TYPE_UBIGINT: {
int32_t mid = dumpedRows >> 1u;
int64_t* pts = (int64_t*)pColData->pData;
for (int32_t j = 0; j < mid; ++j) {
......@@ -1730,7 +1729,7 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
SIterInfo* pIter, int64_t key, SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SRow* pTSRow = NULL;
SBlockData* pBlockData = &pReader->status.fileBlockData;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
......@@ -1876,7 +1875,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader,
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader);
STSRow* pTSRow = NULL;
SRow* pTSRow = NULL;
SRowMerger merge = {0};
TSDBROW fRow = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
tsdbTrace("fRow ptr:%p, %d, uid:%" PRIu64 ", %s", fRow.pBlockData, fRow.iRow, pLastBlockReader->uid, pReader->idStr);
......@@ -1953,7 +1952,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
if (key < ts) { // imem, mem are all empty, file blocks (data blocks and last block) exist
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
} else if (key == ts) {
STSRow* pTSRow = NULL;
SRow* pTSRow = NULL;
SRowMerger merge = {0};
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
......@@ -1993,7 +1992,7 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader
static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, SBlockData* pBlockData,
SLastBlockReader* pLastBlockReader) {
SRowMerger merge = {0};
STSRow* pTSRow = NULL;
SRow* pTSRow = NULL;
int32_t code = TSDB_CODE_SUCCESS;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SArray* pDelList = pBlockScanInfo->delSkyline;
......@@ -2354,7 +2353,7 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
} else {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
STSRow* pTSRow = NULL;
SRow* pTSRow = NULL;
SRowMerger merge = {0};
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
......@@ -3354,7 +3353,7 @@ int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockSc
return TSDB_CODE_SUCCESS;
}
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, STSRow** pTSRow,
int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter, SArray* pDelList, SRow** pTSRow,
STsdbReader* pReader, bool* freeTSRow) {
TSDBROW* pNextRow = NULL;
TSDBROW current = *pRow;
......@@ -3423,7 +3422,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
}
int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader,
STSRow** pTSRow) {
SRow** pTSRow) {
SRowMerger merge = {0};
TSDBKEY k = TSDBROW_KEY(pRow);
......@@ -3476,7 +3475,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
return code;
}
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow, int64_t endKey,
int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, SRow** pTSRow, int64_t endKey,
bool* freeTSRow) {
TSDBROW* pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pBlockScanInfo->delSkyline, pReader);
......@@ -3533,8 +3532,7 @@ int32_t tsdbGetNextRowInMem(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pR
return TSDB_CODE_SUCCESS;
}
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow* pTSRow,
STableBlockScanInfo* pScanInfo) {
int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, STableBlockScanInfo* pScanInfo) {
int32_t numOfRows = pBlock->info.rows;
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
int64_t uid = pScanInfo->uid;
......@@ -3556,7 +3554,7 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, STSRow*
col_id_t colId = pColInfoData->info.colId;
if (colId == pSchema->columns[j].colId) {
tTSRowGetVal(pTSRow, pSchema, j, &colVal);
tRowGet(pTSRow, pSchema, j, &colVal);
doCopyColVal(pColInfoData, numOfRows, i, &colVal, pSupInfo);
i += 1;
j += 1;
......@@ -3633,7 +3631,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
SSDataBlock* pBlock = pReader->pResBlock;
do {
STSRow* pTSRow = NULL;
SRow* pTSRow = NULL;
bool freeTSRow = false;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &pTSRow, endKey, &freeTSRow);
if (pTSRow == NULL) {
......@@ -3789,7 +3787,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL
updateBlockSMAInfo(pReader->pSchema, &pReader->suppInfo);
}
STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader;
STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader;
pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables);
if (pReader->status.pTableMap == NULL) {
tsdbReaderClose(p);
......@@ -4135,7 +4133,7 @@ int32_t tsdbRetrieveDatablockSMA(STsdbReader* pReader, SColumnDataAgg*** pBlockS
taosArrayAddAll(pSup->pColAgg, pNewAggList);
size_t num = taosArrayGetSize(pSup->pColAgg);
for(int32_t k = 0; k < num; ++k) {
for (int32_t k = 0; k < num; ++k) {
pSup->plist[k] = taosArrayGet(pSup->pColAgg, k);
}
......
......@@ -573,7 +573,7 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
ASSERT(iCol > 0);
if (pRow->type == 0) {
tTSRowGetVal(pRow->pTSRow, pTSchema, iCol, pColVal);
tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal);
} else if (pRow->type == 1) {
SColData *pColData;
......@@ -621,7 +621,7 @@ void tsdbRowIterInit(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) {
SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
if (pIter->pRow->type == 0) {
if (pIter->i < pIter->pTSchema->numOfCols) {
tTSRowGetVal(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal);
tRowGet(pIter->pRow->pTSRow, pIter->pTSchema, pIter->i, &pIter->colVal);
pIter->i++;
return &pIter->colVal;
......@@ -807,12 +807,8 @@ _exit:
return code;
}
int32_t tRowMergerGetRow(SRowMerger *pMerger, STSRow **ppRow) {
int32_t code = 0;
code = tdSTSRowNew(pMerger->pArray, pMerger->pTSchema, ppRow);
return code;
int32_t tRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) {
return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow);
}
// delete skyline ======================================================
......@@ -1247,15 +1243,16 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
SColVal cv = {0};
if (pRow->type == 0) {
if (TD_IS_TP_ROW(pRow->pTSRow)) {
code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema);
if (code) goto _err;
} else if (TD_IS_KV_ROW(pRow->pTSRow)) {
code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema);
if (code) goto _err;
} else {
ASSERT(0);
}
// if (TD_IS_TP_ROW(pRow->pTSRow)) {
// code = tBlockDataAppendTPRow(pBlockData, pRow->pTSRow, pTSchema);
// if (code) goto _err;
// } else if (TD_IS_KV_ROW(pRow->pTSRow)) {
// code = tBlockDataAppendKVRow(pBlockData, pRow->pTSRow, pTSchema);
// if (code) goto _err;
// } else {
// ASSERT(0);
// }
} else {
code = tBlockDataAppendBlockRow(pBlockData, pRow->pBlockData, pRow->iRow);
if (code) goto _err;
......
......@@ -77,52 +77,37 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
tDecoderClear(&dc);
} break;
case TDMT_VND_SUBMIT: {
SSubmitMsgIter msgIter = {0};
SSubmitReq *pSubmitReq = (SSubmitReq *)pMsg->pCont;
SSubmitBlk *pBlock = NULL;
int64_t ctime = taosGetTimestampMs();
tb_uid_t uid;
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
tStartDecode(&dc);
if (tInitSubmitMsgIter(pSubmitReq, &msgIter) < 0) {
code = terrno;
goto _err;
}
int32_t flag;
tDecodeI32v(&dc, &flag);
for (;;) {
tGetSubmitMsgNext(&msgIter, &pBlock);
if (pBlock == NULL) break;
if (flag & SUBMIT_REQ_AUTO_CREATE_TABLE) {
int64_t ctime = taosGetTimestampMs();
int64_t nReq;
int64_t uid;
if (msgIter.schemaLen > 0) {
tDecodeI64v(&dc, &nReq);
for (int64_t iReq; iReq < nReq; iReq++) {
char *name = NULL;
tDecoderInit(&dc, pBlock->data, msgIter.schemaLen);
if (tStartDecode(&dc) < 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
}
if (tDecodeI32v(&dc, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
}
if (tDecodeCStr(&dc, &name) < 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
}
tStartDecode(&dc);
tDecodeI32v(&dc, NULL);
tDecodeCStr(&dc, &name);
uid = metaGetTableEntryUidByName(pVnode->pMeta, name);
if (uid == 0) {
uid = tGenIdPI64();
}
*(int64_t *)(dc.data + dc.pos) = uid;
*(int64_t *)(dc.data + dc.pos + 8) = ctime;
pBlock->uid = htobe64(uid);
tEndDecode(&dc);
tDecoderClear(&dc);
}
}
tEndDecode(&dc);
} break;
case TDMT_VND_DELETE: {
int32_t size;
......@@ -855,6 +840,28 @@ static int32_t vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const
}
static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
#if 1
SDecoder dc = {0};
SSubmitReq2 *pSubmitReq = NULL;
tDecoderInit(&dc, (char *)pReq + sizeof(SMsgHead), len - sizeof(SMsgHead));
tDecodeSSubmitReq2(&dc, &pSubmitReq);
if (pSubmitReq->flag & SUBMIT_REQ_AUTO_CREATE_TABLE) {
// todo
ASSERT(0);
}
for (int32_t iSubmitTbData = 0; iSubmitTbData < taosArrayGetSize(pSubmitReq->aSubmitTbData); iSubmitTbData++) {
SSubmitTbData *pSubmitTbData = taosArrayGet(pSubmitReq->aSubmitTbData, iSubmitTbData);
int32_t code = tsdbInsertTableData(pVnode->pTsdb, version, pSubmitTbData, NULL /*todo*/);
if (code) {
// todo
}
}
#else
SSubmitReq *pSubmitReq = (SSubmitReq *)pReq;
SSubmitRsp submitRsp = {0};
SSubmitMsgIter msgIter = {0};
......@@ -1007,6 +1014,8 @@ _exit:
vDebug("vgId:%d, submit success, index:%" PRId64, pVnode->config.vgId, version);
return 0;
#endif
return 0;
}
static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册