提交 fe77479a 编写于 作者: C Cary Xu

CQ adaption to SMemRow

上级 2e5ce3eb
......@@ -348,10 +348,7 @@ typedef struct {
int16_t sversion;
int32_t flen;
// for SKVRow
uint16_t tCols;
uint16_t nCols;
SColIdx* pColIdx;
uint16_t alloc;
uint16_t size;
void* buf;
......@@ -359,40 +356,8 @@ typedef struct {
SSubmitBlk* pSubmitBlk;
} SMemRowBuilder;
// int tdInitMemRowBuilder(SMemRowBuilder* pBuilder);
// void tdDestroyMemRowBuilder(SMemRowBuilder* pBuilder);
// void tdResetMemRowBuilder(SMemRowBuilder* pBuilder);
SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder);
// static FORCE_INLINE int tdAddColToMemRow(SMemRowBuilder* pBuilder, int16_t colId, int8_t type, void* value) {
// // TODO
// if (pBuilder->nCols >= pBuilder->tCols) {
// pBuilder->tCols *= 2;
// pBuilder->pColIdx = (SColIdx*)realloc((void*)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
// if (pBuilder->pColIdx == NULL) return -1;
// }
// pBuilder->pColIdx[pBuilder->nCols].colId = colId;
// pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
// pBuilder->nCols++;
// int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type];
// if (tlen > pBuilder->alloc - pBuilder->size) {
// while (tlen > pBuilder->alloc - pBuilder->size) {
// pBuilder->alloc *= 2;
// }
// pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc);
// if (pBuilder->buf == NULL) return -1;
// }
// memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
// pBuilder->size += tlen;
// return 0;
// }
#ifdef __cplusplus
}
#endif
......
......@@ -1716,6 +1716,7 @@ static FORCE_INLINE uint8_t tdRowTypeJudger(SSchema* pSchema, void* pData, int32
SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf;
int toffset = 0;
if(pBuilder->nCols <= 0){
return NULL;
......@@ -1723,7 +1724,6 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
uint16_t nColsNotNull = 0;
uint8_t memRowType = tdRowTypeJudger(pSchema, p, pBuilder->nCols, pBuilder->flen, &nColsNotNull);
tscDebug("prop:memType is %d", memRowType);
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType);
......@@ -1733,7 +1733,6 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion);
int toffset = 0;
p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) {
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
......@@ -1748,7 +1747,6 @@ SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
kvRowSetLen(kvRow, tlen);
kvRowSetNCols(kvRow, nColsNotNull);
int toffset = 0;
p = (char*)pBuilder->buf;
for (int32_t j = 0; j < pBuilder->nCols; ++j) {
if(!isNull(p, pSchema[j].type)) {
......@@ -1821,22 +1819,6 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
mRowBuilder.size = 0;
for (int32_t i = 0; i < numOfRows; ++i) {
#if 0
SDataRow trow = (SDataRow)pDataBlock; // generate each SDataRow one by one
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + flen));
dataRowSetVersion(trow, pTableMeta->sversion);
// scan each column data and generate the data row
int toffset = 0;
for (int32_t j = 0; j < tinfo.numOfColumns; j++) {
tdAppendColVal(trow, p, pSchema[j].type, pSchema[j].bytes, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p += pSchema[j].bytes;
}
pDataBlock = (char*)pDataBlock + dataRowLen(trow); // next SDataRow
pBlock->dataLen += dataRowLen(trow); // SSubmitBlk data length
#endif
tdGenMemRowFromBuilder(&mRowBuilder);
}
......@@ -1849,7 +1831,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
int32_t result = TD_DATA_ROW_HEAD_SIZE;
int32_t columns = tscGetNumOfColumns(pTableMeta);
int32_t columns = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
for(int32_t i = 0; i < columns; i++) {
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
......
......@@ -73,7 +73,7 @@ typedef struct {
int8_t type; // Column type
int16_t colId; // column ID
uint16_t bytes; // column bytes
uint16_t offset; // point offset in SDataRow/SKVRow after the header part.
uint16_t offset; // point offset in SDataRow after the header part.
} STColumn;
#define colType(col) ((col)->type)
......@@ -184,20 +184,6 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
return 0;
}
}
// ----------------- Sequential Data row structure
/* A sequential data row, the format is like below:
* |<--------------------+--------------------------- len ---------------------------------->|
* |<-- Head -->|<--------- flen -------------->| |
* +---------------------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | sversion | First part | Second part |
* +----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/
typedef void *SDataRow;
/* A memory data row, the format is like below:
*|---------+---------------------+--------------------------- len ---------------------------------->|
*|<- type->|<-- Head -->|<--------- flen -------------->| |
......@@ -211,10 +197,25 @@ typedef void *SDataRow;
*/
typedef void *SMemRow;
// ----------------- Data row structure
/* A data row, the format is like below:
* |<--------------------+--------------------------- len ---------------------------------->|
* |<-- Head -->|<--------- flen -------------->| |
* +---------------------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | sversion | First part | Second part |
* +----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/
typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define dataRowLen(r) (*(uint16_t *)(r))
#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define dataRowVersion(r) *(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
#define dataRowKey(r) tdGetKey(dataRowTKey(r))
......@@ -254,7 +255,7 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t
}
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetRowDataOfCol(void *row, int8_t type, int32_t offset) {
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow *row, int8_t type, int32_t offset) {
if (IS_VAR_DATA_TYPE(type)) {
return POINTER_SHIFT(row, *(VarDataOffsetT *)POINTER_SHIFT(row, offset));
} else {
......
......@@ -476,7 +476,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
cDebug("vgId:%d, id:%d CQ:%s stream result is ready", pContext->vgId, pObj->tid, pObj->sqlStr);
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_DATA_ROW_HEAD_SIZE + pObj->rowSize;
int32_t size = sizeof(SWalHead) + sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + TD_MEM_ROW_HEAD_SIZE + pObj->rowSize;
char *buffer = calloc(size, 1);
SWalHead *pHead = (SWalHead *)buffer;
......@@ -484,7 +484,9 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SSubmitBlk *pBlk = (SSubmitBlk *) (buffer + sizeof(SWalHead) + sizeof(SSubmitMsg));
SMemRow trow = (SMemRow)pBlk->data;
tdInitDataRow(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), pSchema);
SDataRow dataRow = (SDataRow)memRowBody(trow);
memRowSetType(trow, SMEM_ROW_DATA);
tdInitDataRow(dataRow, pSchema);
for (int32_t i = 0; i < pSchema->numOfCols; i++) {
STColumn *c = pSchema->columns + i;
......@@ -500,7 +502,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
memcpy((char *)val + sizeof(VarDataLenT), buf, len);
varDataLen(val) = len;
}
tdAppendColVal(POINTER_SHIFT(trow, TD_MEM_ROW_TYPE_SIZE), val, c->type, c->bytes, c->offset);
tdAppendColVal(dataRow, val, c->type, c->bytes, c->offset);
}
pBlk->dataLen = htonl(memRowTLen(trow));
pBlk->schemaLen = 0;
......
......@@ -4002,7 +4002,7 @@ void blockInfo_func(SQLFunctionCtx* pCtx) {
int32_t len = *(int32_t*) pCtx->pInput;
blockDistInfoFromBinary((char*)pCtx->pInput + sizeof(int32_t), len, pDist);
pDist->rowSize = (int16_t) pCtx->param[0].i64;
pDist->rowSize = (uint16_t)pCtx->param[0].i64;
memcpy(pCtx->pOutput, pCtx->pInput, sizeof(int32_t) + len);
......@@ -4149,7 +4149,7 @@ void blockinfo_func_finalizer(SQLFunctionCtx* pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
STableBlockDist* pDist = (STableBlockDist*) GET_ROWCELL_INTERBUF(pResInfo);
pDist->rowSize = (int16_t)pCtx->param[0].i64;
pDist->rowSize = (uint16_t)pCtx->param[0].i64;
generateBlockDistResult(pDist, pCtx->pOutput);
// cannot set the numOfIteratedElems again since it is set during previous iteration
......
......@@ -920,7 +920,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCo
SDataCol * pDataCol = pDataCols->cols + ncol;
SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull;
// if (isNEleNull(pDataCol, rowsToWrite)) { // all data to commit are NULL, just ignore it
if (isAllRowOfColNull(pDataCol)) { // all data to commit are NULL, just ignore it
continue;
}
......
......@@ -1021,10 +1021,9 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro
} else {
// SKVRow
SColIdx *pColIdx = tdGetKVRowIdxOfCol(rowBody, pTCol->colId);
if(pColIdx) {
value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset);
if (pColIdx) {
value = tdGetKvRowDataOfCol(rowBody, pColIdx->offset);
}
}
if ((value == NULL) || isNull(value, pTCol->type)) {
......
......@@ -145,7 +145,7 @@ typedef struct STableGroupSupporter {
static STimeWindow updateLastrowForEachGroup(STableGroupInfo *groupList);
static int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList);
static int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle);
static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static int32_t tsdbGetCachedLastRow(STable* pTable, SMemRow* pRes, TSKEY* lastKey);
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册