提交 51896439 编写于 作者: L Liu Jicong

extract compressed col data format

上级 c765b0b8
......@@ -17,6 +17,7 @@
#define _TD_COMMON_EP_H_
#include "tcommon.h"
#include "tcompression.h"
#include "tmsg.h"
#ifdef __cplusplus
......@@ -115,11 +116,11 @@ static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uin
static FORCE_INLINE void colDataAppendNNULL(SColumnInfoData* pColumnInfoData, uint32_t start, size_t nRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
for(int32_t i = start; i < start + nRows; ++i) {
for (int32_t i = start; i < start + nRows; ++i) {
pColumnInfoData->varmeta.offset[i] = -1; // it is a null value of VAR type.
}
} else {
for(int32_t i = start; i < start + nRows; ++i) {
for (int32_t i = start; i < start + nRows; ++i) {
colDataSetNull_f(pColumnInfoData->nullbitmap, i);
}
}
......@@ -135,7 +136,8 @@ static FORCE_INLINE int32_t colDataAppendInt8(SColumnInfoData* pColumnInfoData,
}
static FORCE_INLINE int32_t colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int16_t*)p = *(int16_t*)v;
}
......@@ -164,7 +166,6 @@ static FORCE_INLINE int32_t colDataAppendDouble(SColumnInfoData* pColumnInfoData
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(double*)p = *(double*)v;
}
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource,
uint32_t numOfRow2);
......@@ -200,12 +201,58 @@ void blockDataCleanup(SSDataBlock* pDataBlock);
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
void* blockDataDestroy(SSDataBlock* pBlock);
int32_t blockDataTrimFirstNRows(SSDataBlock *pBlock, size_t n);
int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks);
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
int8_t compressed) {
int32_t colSize = colDataGetLength(pColRes, numOfRows);
return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) {
int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t);
*dataLen = (numOfCols * sizeof(int32_t));
int32_t numOfRows = pBlock->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
// copy the null bitmap
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
size_t metaSize = numOfRows * sizeof(int32_t);
memcpy(data, pColRes->varmeta.offset, metaSize);
data += metaSize;
(*dataLen) += metaSize;
} else {
int32_t len = BitmapLen(numOfRows);
memcpy(data, pColRes->nullbitmap, len);
data += len;
(*dataLen) += len;
}
if (needCompress) {
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
data += colSizes[col];
(*dataLen) += colSizes[col];
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
(*dataLen) += colSizes[col];
memmove(data, pColRes->pData, colSizes[col]);
data += colSizes[col];
}
colSizes[col] = htonl(colSizes[col]);
}
}
#ifdef __cplusplus
}
#endif
......
......@@ -296,7 +296,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
STqTopic* pTopic = NULL;
int sz = taosArrayGetSize(pConsumer->topics);
int32_t sz = taosArrayGetSize(pConsumer->topics);
for (int32_t i = 0; i < sz; i++) {
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
// TODO race condition
......@@ -316,7 +316,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return 0;
}
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, pTq->pVnode->vgId);
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
pTq->pVnode->vgId);
rsp.reqOffset = pReq->currentOffset;
rsp.skipLogNum = 0;
......@@ -326,7 +327,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO
consumerEpoch = atomic_load_32(&pConsumer->epoch);
if (consumerEpoch > reqEpoch) {
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch);
break;
}
SWalReadHead* pHead;
......@@ -334,10 +336,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset);
break;
}
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType);
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
pTq->pVnode->vgId, fetchOffset, pHead->msgType);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if (pHead->msgType == TDMT_VND_SUBMIT) {
......@@ -361,7 +365,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
if (taosArrayGetSize(pRes) == 0) {
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
pReq->epoch, pTq->pVnode->vgId, fetchOffset);
fetchOffset++;
rsp.skipLogNum++;
taosArrayDestroy(pRes);
......@@ -390,7 +395,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch);
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
pHead->msgType, consumerId, pReq->epoch);
tmsgSendRsp(pMsg);
taosMemoryFree(pHead);
return 0;
......@@ -421,7 +427,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg->contLen = tlen;
pMsg->code = 0;
tmsgSendRsp(pMsg);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch);
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId,
pReq->epoch);
/*}*/
return 0;
......@@ -432,7 +439,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
terrno = TSDB_CODE_SUCCESS;
tDecodeSMqMVRebReq(msg, &req);
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId);
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId);
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
ASSERT(pConsumer);
ASSERT(pConsumer->consumerId == req.oldConsumerId);
......
......@@ -18,14 +18,14 @@
#include "executorimpl.h"
#include "planner.h"
#include "tcompression.h"
#include "tdatablock.h"
#include "tglobal.h"
#include "tqueue.h"
#include "tdatablock.h"
typedef struct SDataDispatchBuf {
int32_t useSize;
int32_t allocSize;
char* pData;
char* pData;
} SDataDispatchBuf;
typedef struct SDataCacheEntry {
......@@ -36,26 +36,25 @@ typedef struct SDataCacheEntry {
} SDataCacheEntry;
typedef struct SDataDispatchHandle {
SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataSinkHandle sink;
SDataSinkManager* pManager;
SDataBlockDescNode* pSchema;
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t status;
bool queryEnd;
uint64_t useconds;
TdThreadMutex mutex;
STaosQueue* pDataBlocks;
SDataDispatchBuf nextOutput;
int32_t status;
bool queryEnd;
uint64_t useconds;
TdThreadMutex mutex;
} SDataDispatchHandle;
static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSchema) {
static bool needCompress(const SSDataBlock* pData, int32_t numOfCols) {
if (tsCompressColData < 0 || 0 == pData->info.rows) {
return false;
}
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pData->pDataBlock, col);
int32_t colSize = pColRes->info.bytes * pData->info.rows;
int32_t colSize = pColRes->info.bytes * pData->info.rows;
if (NEEDTO_COMPRESS_QUERY(colSize)) {
return true;
}
......@@ -64,51 +63,6 @@ static bool needCompress(const SSDataBlock* pData, const SDataBlockDescNode* pSc
return false;
}
static int32_t compressColData(SColumnInfoData *pColRes, int32_t numOfRows, char *data, int8_t compressed) {
int32_t colSize = colDataGetLength(pColRes, numOfRows);
return (*(tDataTypes[pColRes->info.type].compFunc))(
pColRes->pData, colSize, numOfRows, data, colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema, char* data, int8_t compressed, int32_t * dataLen) {
int32_t numOfCols = LIST_LENGTH(pSchema->pSlots);
int32_t * colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t);
*dataLen = (numOfCols * sizeof(int32_t));
int32_t numOfRows = pInput->pData->info.rows;
for (int32_t col = 0; col < numOfCols; ++col) {
SColumnInfoData* pColRes = taosArrayGet(pInput->pData->pDataBlock, col);
// copy the null bitmap
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
size_t metaSize = numOfRows * sizeof(int32_t);
memcpy(data, pColRes->varmeta.offset, metaSize);
data += metaSize;
(*dataLen) += metaSize;
} else {
int32_t len = BitmapLen(numOfRows);
memcpy(data, pColRes->nullbitmap, len);
data += len;
(*dataLen) += len;
}
if (compressed) {
colSizes[col] = compressColData(pColRes, numOfRows, data, compressed);
data += colSizes[col];
(*dataLen) += colSizes[col];
} else {
colSizes[col] = colDataGetLength(pColRes, numOfRows);
(*dataLen) += colSizes[col];
memmove(data, pColRes->pData, colSizes[col]);
data += colSizes[col];
}
colSizes[col] = htonl(colSizes[col]);
}
}
// data format:
// +----------------+--------------------------------------+-------------+-----------+-------------+-----------+
// |SDataCacheEntry | column#1 length, column#2 length ... | col1 bitmap | col1 data | col2 bitmap | col2 data | ....
......@@ -117,16 +71,17 @@ static void copyData(const SInputData* pInput, const SDataBlockDescNode* pSchema
// The length of bitmap is decided by number of rows of this data block, and the length of each column data is
// recorded in the first segment, next to the struct header
static void toDataCacheEntry(const SDataDispatchHandle* pHandle, const SInputData* pInput, SDataDispatchBuf* pBuf) {
int32_t numOfCols = LIST_LENGTH(pHandle->pSchema->pSlots);
SDataCacheEntry* pEntry = (SDataCacheEntry*)pBuf->pData;
pEntry->compressed = (int8_t)needCompress(pInput->pData, pHandle->pSchema);
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0;
pEntry->compressed = (int8_t)needCompress(pInput->pData, numOfCols);
pEntry->numOfRows = pInput->pData->info.rows;
pEntry->dataLen = 0;
pBuf->useSize = sizeof(SRetrieveTableRsp);
copyData(pInput, pHandle->pSchema, pEntry->data, pEntry->compressed, &pEntry->dataLen);
blockCompressEncode(pInput->pData, pEntry->data, &pEntry->dataLen, numOfCols, pEntry->compressed);
pEntry->dataLen = pEntry->dataLen;
pBuf->useSize += pEntry->dataLen;
pBuf->useSize += pEntry->dataLen;
}
static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, SDataDispatchBuf* pBuf) {
......@@ -140,7 +95,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
// NOTE: there are four bytes of an integer more than the required buffer space.
// struct size + data payload + length for each column + bitmap length
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) +
ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows);
ceil(blockDataGetSerialRowSize(pInput->pData) * pInput->pData->info.rows);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
......@@ -153,8 +108,9 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
static int32_t updateStatus(SDataDispatchHandle* pDispatcher) {
taosThreadMutexLock(&pDispatcher->mutex);
int32_t blockNums = taosQueueSize(pDispatcher->pDataBlocks);
int32_t status = (0 == blockNums ? DS_BUF_EMPTY :
(blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
int32_t status =
(0 == blockNums ? DS_BUF_EMPTY
: (blockNums < pDispatcher->pManager->cfg.maxDataBlockNumPerQuery ? DS_BUF_LOW : DS_BUF_FULL));
pDispatcher->status = status;
taosThreadMutexUnlock(&pDispatcher->mutex);
return status;
......@@ -169,7 +125,7 @@ static int32_t getStatus(SDataDispatchHandle* pDispatcher) {
static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue) {
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
SDataDispatchBuf* pBuf = taosAllocateQitem(sizeof(SDataDispatchBuf));
if (NULL == pBuf || !allocBuf(pDispatcher, pInput, pBuf)) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -200,7 +156,7 @@ static void getDataLength(SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryE
memcpy(&pDispatcher->nextOutput, pBuf, sizeof(SDataDispatchBuf));
taosFreeQitem(pBuf);
*pLen = ((SDataCacheEntry*)(pDispatcher->nextOutput.pData))->dataLen;
*pQueryEnd = pDispatcher->queryEnd;
*pQueryEnd = pDispatcher->queryEnd;
}
static int32_t getDataBlock(SDataSinkHandle* pHandle, SOutputData* pOutput) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册