提交 642aa478 编写于 作者: M Minglei Jin

last: first round implementation for column loading

上级 76edc16c
......@@ -195,7 +195,8 @@ uint64_t getReaderMaxVersion(STsdbReader *pReader);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, uint64_t suid, void **pReader, const char *idstr);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);
int32_t tsdbGetTableSchema(SVnode *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid);
......
......@@ -808,6 +808,7 @@ typedef struct {
int32_t tsdbOpenCache(STsdb *pTsdb);
void tsdbCloseCache(STsdb *pTsdb);
int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row);
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray **ppLastArray, SCacheRowsReader *pr, char const *lstring);
int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb);
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, STsdb *pTsdb, tb_uid_t uid, TSDBROW *row, bool dup);
int32_t tsdbCacheGetLastH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **h);
......
......@@ -194,7 +194,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow
tsdbRowClose(&iter);
// 3, build keys & multi get from rocks
int max_key_len = 0;
int num_keys = TARRAY_SIZE(aColVal);
char **keys_list = taosMemoryCalloc(num_keys * 2, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys * 2, sizeof(size_t));
......@@ -283,11 +282,49 @@ _exit:
return code;
}
int32_t tsdbCacheGetLast(STsdb *pTsdb, tb_uid_t uid, SArray **ppLastArray, SCacheRowsReader *pr) {
int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray **ppLastArray, SCacheRowsReader *pr, char const *lstring) {
int32_t code = 0;
SArray *pCidList = pr->pCidList;
int num_keys = TARRAY_SIZE(pCidList);
char **keys_list = taosMemoryCalloc(num_keys, sizeof(char *));
size_t *keys_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
for (int i = 0; i < num_keys; ++i) {
int16_t cid = *(int16_t *)taosArrayGet(pCidList, i);
char *keys = taosMemoryCalloc(2, ROCKS_KEY_LEN);
int last_key_len = snprintf(keys, ROCKS_KEY_LEN, "%" PRIi64 ":%" PRIi16 ":%s", uid, cid, lstring);
if (last_key_len >= ROCKS_KEY_LEN) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, __LINE__, tstrerror(code));
}
keys_list[i] = keys;
keys_list_sizes[i] = last_key_len;
}
char **values_list = taosMemoryCalloc(num_keys, sizeof(char *));
size_t *values_list_sizes = taosMemoryCalloc(num_keys, sizeof(size_t));
char **errs = taosMemoryCalloc(num_keys, sizeof(char *));
rocksdb_multi_get(pTsdb->rCache.db, pTsdb->rCache.readoptions, num_keys, (const char *const *)keys_list,
keys_list_sizes, values_list, values_list_sizes, errs);
for (int i = 0; i < num_keys; ++i) {
taosMemoryFree(keys_list[i]);
}
taosMemoryFree(keys_list);
taosMemoryFree(keys_list_sizes);
taosMemoryFree(errs);
SArray *pLastArray = taosArrayInit(num_keys, sizeof(SLastCol));
for (int i = 0; i < num_keys; ++i) {
SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]);
taosArrayPush(pLastArray, pLastCol);
taosMemoryFree(values_list[i]);
}
taosMemoryFree(values_list);
taosMemoryFree(values_list_sizes);
*ppLastArray = pLastArray;
return code;
}
......
......@@ -21,16 +21,16 @@
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
void** pRes, const char* idStr) {
const int32_t* dstSlotIds, void** pRes, const char* idStr) {
int32_t numOfRows = pBlock->info.rows;
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
bool allNullRow = true;
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, dstSlotIds[i]);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[dstSlotIds[i]]);
/*
if (slotIds[i] == -1) { // the primary timestamp
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
p->ts = pColVal->ts;
......@@ -38,16 +38,19 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
*(int64_t*)p->buf = pColVal->ts;
allNullRow = false;
} else {
*/
int32_t slotId = slotIds[i];
// add check for null value, caused by the modification of table schema (new column added).
/*
if (slotId >= taosArrayGetSize(pRow)) {
p->ts = 0;
p->isNull = true;
colDataSetNULL(pColInfoData, numOfRows);
continue;
}
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
*/
// SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, i);
p->ts = pColVal->ts;
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
......@@ -63,12 +66,12 @@ static int32_t saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* p
p->bytes = pReader->pSchema->columns[slotId].bytes;
}
}
}
//}
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
p->hasResult = true;
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[i], false);
varDataSetLen(pRes[dstSlotIds[i]], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
colDataSetVal(pColInfoData, numOfRows, (const char*)pRes[dstSlotIds[i]], false);
}
pBlock->info.rows += allNullRow ? 0 : 1;
......@@ -278,7 +281,8 @@ static int32_t tsdbCacheQueryReseek(void* pQHandle) {
}
}
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, const int32_t* dstSlotIds,
SArray* pTableUidList) {
if (pReader == NULL || pResBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
......@@ -422,11 +426,12 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
}
if (hasRes) {
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
}
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
/*
code = doExtractCacheRow(pr, lruCache, pKeyInfo->uid, &pRow, &h);
if (code != TSDB_CODE_SUCCESS) {
goto _end;
......@@ -442,9 +447,16 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
saveOneRow(pRow, pResBlock, pr, slotIds, pRes, pr->idstr);
// TODO reset the pRes
tsdbCacheRelease(lruCache, h);
*/
char const* lstring = pr->type & CACHESCAN_RETRIEVE_LAST ? "last" : "last_row";
tsdbCacheGet(pr->pTsdb, pKeyInfo->uid, &pRow, pr, lstring);
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
taosArrayDestroy(pRow);
taosArrayPush(pTableUidList, &pKeyInfo->uid);
tsdbCacheRelease(lruCache, h);
pr->tableIndex += 1;
if (pResBlock->info.rows >= pResBlock->info.capacity) {
......
......@@ -31,6 +31,7 @@ typedef struct SCacheRowsScanInfo {
void* pLastrowReader;
SColMatchInfo matchInfo;
int32_t* pSlotIds;
int32_t* pDstSlotIds;
SExprSupp pseudoExprSup;
int32_t retrieveType;
int32_t currentGroupIndex;
......@@ -43,7 +44,8 @@ typedef struct SCacheRowsScanInfo {
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
static void destroyCacheScanOperator(void* param);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
int32_t** pDstSlotIds);
static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo);
#define SCAN_ROW_TYPE(_t) ((_t) ? CACHESCAN_RETRIEVE_LAST : CACHESCAN_RETRIEVE_LAST_ROW)
......@@ -81,7 +83,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
removeRedundantTsCol(pScanNode, &pInfo->matchInfo);
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds);
code = extractCacheScanSlotId(pInfo->matchInfo.pList, pTaskInfo, &pInfo->pSlotIds, &pInfo->pDstSlotIds);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
......@@ -168,8 +170,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pBufferredRes);
taosArrayClear(pInfo->pUidList);
int32_t code =
tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds, pInfo->pUidList);
int32_t code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pBufferredRes, pInfo->pSlotIds,
pInfo->pDstSlotIds, pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -245,7 +247,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
taosArrayClear(pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pUidList);
code = tsdbRetrieveCacheRows(pInfo->pLastrowReader, pInfo->pRes, pInfo->pSlotIds, pInfo->pDstSlotIds,
pInfo->pUidList);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
......@@ -290,6 +293,7 @@ void destroyCacheScanOperator(void* param) {
blockDataDestroy(pInfo->pRes);
blockDataDestroy(pInfo->pBufferredRes);
taosMemoryFree(pInfo->pSlotIds);
taosMemoryFree(pInfo->pDstSlotIds);
taosArrayDestroy(pInfo->pCidList);
taosArrayDestroy(pInfo->pUidList);
taosArrayDestroy(pInfo->matchInfo.pList);
......@@ -303,7 +307,8 @@ void destroyCacheScanOperator(void* param) {
taosMemoryFreeClear(param);
}
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds,
int32_t** pDstSlotIds) {
size_t numOfCols = taosArrayGetSize(pColMatchInfo);
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
......@@ -311,18 +316,25 @@ int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTask
return TSDB_CODE_OUT_OF_MEMORY;
}
*pDstSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
if (*pDstSlotIds == NULL) {
taosMemoryFree(*pSlotIds);
return TSDB_CODE_OUT_OF_MEMORY;
}
SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;
for (int32_t i = 0; i < numOfCols; ++i) {
SColMatchItem* pColMatch = taosArrayGet(pColMatchInfo, i);
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
/* if (pColMatch->colId == pWrapper->pSchema[j].colId && pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
(*pSlotIds)[pColMatch->dstSlotId] = -1;
break;
}
}*/
if (pColMatch->colId == pWrapper->pSchema[j].colId) {
(*pSlotIds)[pColMatch->dstSlotId] = j;
(*pSlotIds)[i] = j;
(*pDstSlotIds)[i] = pColMatch->dstSlotId;
break;
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册