未验证 提交 b3dca3eb 编写于 作者: wmmhello's avatar wmmhello 提交者: GitHub

Merge pull request #13176 from taosdata/feature/TD-14222-new

feat:add encode/decode for operator
......@@ -156,13 +156,11 @@ typedef struct STaskAttr {
} STaskAttr;
struct SOperatorInfo;
struct SAggSupporter;
struct SOptrBasicInfo;
//struct SAggSupporter;
//struct SOptrBasicInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
struct SOptrBasicInfo* pInfo, char** result, int32_t* length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
......@@ -445,14 +443,16 @@ typedef struct STimeWindowSupp {
} STimeWindowAggSupp;
typedef struct SIntervalAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock
SAggSupporter aggSup; // aggregate supporter
STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
......@@ -463,19 +463,23 @@ typedef struct SIntervalAggOperatorInfo {
} SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
SAggSupporter aggSup; // aggregate supporter
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SArray* pChildren;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo *current;
uint64_t groupId;
SGroupResInfo groupResInfo;
......@@ -488,8 +492,10 @@ typedef struct SAggOperatorInfo {
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SSDataBlock* existDataBlock;
SArray* pPseudoColInfo;
SLimit limit;
......@@ -513,7 +519,10 @@ typedef struct SFillOperatorInfo {
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
......@@ -521,7 +530,6 @@ typedef struct SGroupbyOperatorInfo {
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SAggSupporter aggSup;
SExprInfo* pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression in group operator
SqlFunctionCtx* pScalarFuncCtx;
......@@ -558,8 +566,10 @@ typedef struct SWindowRowsSup {
} SWindowRowsSup;
typedef struct SSessionAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool reptScan; // next round scan
......@@ -598,8 +608,10 @@ typedef struct STimeSliceOperatorInfo {
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
SColumn stateCol; // start row index
......@@ -611,8 +623,10 @@ typedef struct SStateWindowOperatorInfo {
} SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
......@@ -624,7 +638,6 @@ typedef struct SSortedMergeOperatorInfo {
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SAggSupporter aggSup;
} SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo {
......@@ -786,16 +799,31 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec();
/*
* ops: root operator
* data: *data save the result of encode, need to be freed by caller
* length: *length save the length of *data
* return: result code, 0 means success
*/
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length);
/*
* ops: root operator, created by caller
* data: save the result of decode
* length: the length of data
* return: result code, 0 means success
*/
int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model);
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum);
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result,
int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length);
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn,
......
......@@ -3498,17 +3498,24 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pInfo->pRes;
}
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length) {
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
if(result == NULL || length == NULL){
return TSDB_CODE_TSC_INVALID_INPUT;
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
*result = taosMemoryCalloc(1, totalSize);
int32_t totalSize = sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
*result = (char*)taosMemoryCalloc(1, totalSize);
if (*result == NULL) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_OUT_OF_MEMORY;
}
*(int32_t*)(*result) = size;
int32_t offset = sizeof(int32_t);
*(int32_t*)(*result + offset) = size;
offset += sizeof(int32_t);
// prepare memory
SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
......@@ -3530,12 +3537,11 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
// recalculate the result size
int32_t realTotalSize = offset + sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize;
if (realTotalSize > totalSize) {
char* tmp = taosMemoryRealloc(*result, realTotalSize);
char* tmp = (char*)taosMemoryRealloc(*result, realTotalSize);
if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(*result);
*result = NULL;
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_OUT_OF_MEMORY;
} else {
*result = tmp;
}
......@@ -3555,17 +3561,18 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
}
if (length) {
*length = offset;
}
return;
*(int32_t*)(*result) = offset;
*length = offset;
return TDB_CODE_SUCCESS;
}
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result,
int32_t length) {
if (!result || length <= 0) {
return false;
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) {
if(result == NULL || length <= 0){
return TSDB_CODE_TSC_INVALID_INPUT;
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)POINTER_SHIFT(pOperator->info, sizeof(SOptrBasicInfo));
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t count = *(int32_t*)(result);
......@@ -3578,7 +3585,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
if (!resultRow) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return TSDB_CODE_TSC_INVALID_INPUT;
}
// add a new result set for a new group
......@@ -3588,7 +3595,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset);
if (valueLen != pSup->resultRowSize) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return TSDB_CODE_TSC_INVALID_INPUT;
}
offset += sizeof(int32_t);
int32_t pageId = resultRow->pageId;
......@@ -3607,9 +3614,9 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
}
if (offset != length) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return TSDB_CODE_TSC_INVALID_INPUT;
}
return true;
return TDB_CODE_SUCCESS;
}
enum {
......@@ -4986,6 +4993,91 @@ _error:
return NULL;
}
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){
int32_t code = TDB_CODE_SUCCESS;
char *pCurrent = NULL;
int32_t currLength = 0;
if(ops->fpSet.encodeResultRow){
if(result == NULL || length == NULL){
return TSDB_CODE_TSC_INVALID_INPUT;
}
code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
if(code != TDB_CODE_SUCCESS){
if(*result != NULL){
taosMemoryFree(*result);
*result = NULL;
}
return code;
}
if(*result == NULL){
*result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t));
if (*result == NULL) {
taosMemoryFree(pCurrent);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*result + sizeof(int32_t), pCurrent, currLength);
*(int32_t*)(*result) = currLength + sizeof(int32_t);
}else{
int32_t sizePre = *(int32_t*)(*result);
char* tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength);
if (tmp == NULL) {
taosMemoryFree(pCurrent);
taosMemoryFree(*result);
*result = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
*result = tmp;
memcpy(*result + sizePre, pCurrent, currLength);
*(int32_t*)(*result) += currLength;
}
taosMemoryFree(pCurrent);
*length = *(int32_t*)(*result);
}
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length);
if(code != TDB_CODE_SUCCESS){
return code;
}
}
return TDB_CODE_SUCCESS;
}
int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){
int32_t code = TDB_CODE_SUCCESS;
if(ops->fpSet.decodeResultRow){
if(result == NULL || length <= 0){
return TSDB_CODE_TSC_INVALID_INPUT;
}
char* data = result + 2 * sizeof(int32_t);
int32_t dataLength = *(int32_t*)(result + sizeof(int32_t));
code = ops->fpSet.decodeResultRow(ops, data, dataLength - sizeof(int32_t));
if(code != TDB_CODE_SUCCESS){
return code;
}
int32_t totalLength = *(int32_t*)result;
if(totalLength == dataLength + sizeof(int32_t)) { // the last data
result = NULL;
length = 0;
}else{
result += dataLength;
*(int32_t*)(result) = totalLength - dataLength;
length = totalLength - dataLength;
}
}
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = decodeOperator(ops->pDownstream[i], result, length);
if(code != TDB_CODE_SUCCESS){
return code;
}
}
return TDB_CODE_SUCCESS;
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
EOPTR_EXEC_MODEL model) {
uint64_t queryId = pPlan->id.queryId;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册