提交 5ebd2ec8 编写于 作者: wmmhello's avatar wmmhello

feat:add encode/decode for operator

上级 780644c1
......@@ -156,13 +156,11 @@ typedef struct STaskAttr {
} STaskAttr;
struct SOperatorInfo;
struct SAggSupporter;
struct SOptrBasicInfo;
//struct SAggSupporter;
//struct SOptrBasicInfo;
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
struct SOptrBasicInfo* pInfo, char** result, int32_t* length);
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggSupporter* pSup,
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length);
typedef int32_t (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char* result, int32_t length);
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
......@@ -439,14 +437,16 @@ typedef struct STimeWindowSupp {
} STimeWindowAggSupp;
typedef struct SIntervalAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
STimeWindow win; // query time range
bool timeWindowInterpo; // interpolation needed or not
char** pRow; // previous row/tuple of already processed datablock
SAggSupporter aggSup; // aggregate supporter
STableQueryInfo* pCurrent; // current tableQueryInfo struct
int32_t order; // current SSDataBlock scan order
EOPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
......@@ -457,19 +457,23 @@ typedef struct SIntervalAggOperatorInfo {
} SIntervalAggOperatorInfo;
typedef struct SStreamFinalIntervalOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo; // basic info
SAggSupporter aggSup; // aggregate supporter
SGroupResInfo groupResInfo; // multiple results build supporter
SInterval interval; // interval info
int32_t primaryTsIndex; // primary time stamp slot id from result of downstream operator.
SAggSupporter aggSup; // aggregate supporter
int32_t order; // current SSDataBlock scan order
STimeWindowAggSupp twAggSup;
SArray* pChildren;
} SStreamFinalIntervalOperatorInfo;
typedef struct SAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
STableQueryInfo *current;
uint64_t groupId;
SGroupResInfo groupResInfo;
......@@ -482,8 +486,10 @@ typedef struct SAggOperatorInfo {
} SAggOperatorInfo;
typedef struct SProjectOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SSDataBlock* existDataBlock;
SArray* pPseudoColInfo;
SLimit limit;
......@@ -507,7 +513,10 @@ typedef struct SFillOperatorInfo {
} SFillOperatorInfo;
typedef struct SGroupbyOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition;
......@@ -515,7 +524,6 @@ typedef struct SGroupbyOperatorInfo {
char* keyBuf; // group by keys for hash
int32_t groupKeyLen; // total group by column width
SGroupResInfo groupResInfo;
SAggSupporter aggSup;
SExprInfo* pScalarExprInfo;
int32_t numOfScalarExpr; // the number of scalar expression in group operator
SqlFunctionCtx* pScalarFuncCtx;
......@@ -552,8 +560,10 @@ typedef struct SWindowRowsSup {
} SWindowRowsSup;
typedef struct SSessionAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
bool reptScan; // next round scan
......@@ -591,8 +601,10 @@ typedef struct STimeSliceOperatorInfo {
} STimeSliceOperatorInfo;
typedef struct SStateWindowOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SGroupResInfo groupResInfo;
SWindowRowsSup winSup;
SColumn stateCol; // start row index
......@@ -604,8 +616,10 @@ typedef struct SStateWindowOperatorInfo {
} SStateWindowOperatorInfo;
typedef struct SSortedMergeOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo binfo;
SAggSupporter aggSup;
SArray* pSortInfo;
int32_t numOfSources;
SSortHandle *pSortHandle;
......@@ -617,7 +631,6 @@ typedef struct SSortedMergeOperatorInfo {
int32_t numOfResPerPage;
char** groupVal;
SArray *groupInfo;
SAggSupporter aggSup;
} SSortedMergeOperatorInfo;
typedef struct SSortOperatorInfo {
......@@ -778,8 +791,21 @@ void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo);
int32_t getMaximumIdleDurationSec();
int32_t encodeExecTaskInfo(SOperatorInfo* ops, char** data);
int32_t decodeExecTaskInfo(SOperatorInfo* ops, char* data);
/*
* ops: root operator
* data: *data save the result of encode, need to be freed by caller
* length: *length save the length of *data
* return: result code, 0 means success
*/
int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t *length);
/*
* ops: root operator, created by caller
* data: save the result of decode
* length: the length of data
* return: result code, 0 means success
*/
int32_t decodeOperator(SOperatorInfo* ops, char* data, int32_t length);
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
......@@ -787,10 +813,9 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo** pRes, int32_t* capacity,
int32_t* resNum);
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result,
int32_t length);
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length);
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length);
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length);
STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts,
SInterval* pInterval, int32_t precision, STimeWindow* win);
int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn,
......
......@@ -3491,17 +3491,24 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
return (rows == 0) ? NULL : pInfo->pRes;
}
void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char** result,
int32_t* length) {
int32_t aggEncodeResultRow(SOperatorInfo* pOperator, char** result, int32_t* length) {
if(result == NULL || length == NULL){
return TSDB_CODE_TSC_INVALID_INPUT;
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)(pOperator->info + sizeof(SOptrBasicInfo));
int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
size_t keyLen = sizeof(uint64_t) * 2; // estimate the key length
int32_t totalSize = sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
int32_t totalSize = sizeof(int32_t) + sizeof(int32_t) + size * (sizeof(int32_t) + keyLen + sizeof(int32_t) + pSup->resultRowSize);
*result = taosMemoryCalloc(1, totalSize);
if (*result == NULL) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_OUT_OF_MEMORY;
}
*(int32_t*)(*result) = size;
int32_t offset = sizeof(int32_t);
*(int32_t*)(*result + offset) = size;
offset += sizeof(int32_t);
// prepare memory
SResultRowPosition* pos = &pInfo->resultRowInfo.cur;
......@@ -3525,10 +3532,9 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
if (realTotalSize > totalSize) {
char* tmp = taosMemoryRealloc(*result, realTotalSize);
if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(*result);
*result = NULL;
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
return TSDB_CODE_OUT_OF_MEMORY;
} else {
*result = tmp;
}
......@@ -3548,17 +3554,18 @@ void aggEncodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
pIter = taosHashIterate(pSup->pResultRowHashTable, pIter);
}
if (length) {
*length = offset;
}
return;
*(int32_t*)(*result) = offset;
*length = offset;
return TDB_CODE_SUCCESS;
}
bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasicInfo* pInfo, char* result,
int32_t length) {
if (!result || length <= 0) {
return false;
int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result, int32_t length) {
if(result == NULL || length <= 0){
return TSDB_CODE_TSC_INVALID_INPUT;
}
SOptrBasicInfo* pInfo = (SOptrBasicInfo*)(pOperator->info);
SAggSupporter* pSup = (SAggSupporter*)(pOperator->info + sizeof(SOptrBasicInfo));
// int32_t size = taosHashGetSize(pSup->pResultRowHashTable);
int32_t count = *(int32_t*)(result);
......@@ -3571,7 +3578,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
uint64_t tableGroupId = *(uint64_t*)(result + offset);
SResultRow* resultRow = getNewResultRow_rv(pSup->pResultBuf, tableGroupId, pSup->resultRowSize);
if (!resultRow) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return TSDB_CODE_TSC_INVALID_INPUT;
}
// add a new result set for a new group
......@@ -3581,7 +3588,7 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
offset += keyLen;
int32_t valueLen = *(int32_t*)(result + offset);
if (valueLen != pSup->resultRowSize) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return TSDB_CODE_TSC_INVALID_INPUT;
}
offset += sizeof(int32_t);
int32_t pageId = resultRow->pageId;
......@@ -3600,9 +3607,9 @@ bool aggDecodeResultRow(SOperatorInfo* pOperator, SAggSupporter* pSup, SOptrBasi
}
if (offset != length) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_INVALID_INPUT);
return TSDB_CODE_TSC_INVALID_INPUT;
}
return true;
return TDB_CODE_SUCCESS;
}
enum {
......@@ -4954,64 +4961,89 @@ _error:
return NULL;
}
int32_t encodeExecTaskInfo(SOperatorInfo* ops, char** result){
uint8_t operatorType = ops->operatorType;
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t *length){
int32_t code = TDB_CODE_SUCCESS;
char *pCurrent = NULL;
int32_t currLength = 0;
if(ops->fpSet.encodeResultRow){
if(result == NULL || length == NULL){
return TSDB_CODE_TSC_INVALID_INPUT;
}
code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength);
int32_t num = 0;
int32_t size = ops->numOfDownstream;
if(code != TDB_CODE_SUCCESS){
if(*result != NULL){
taosMemoryFree(*result);
*result = NULL;
}
return code;
}
for (int32_t i = 0; i < size; ++i) {
encodeExecTaskInfo(ops->pDownstream[i], result);
if(*result == NULL){
*result = taosMemoryCalloc(1, currLength + sizeof(int32_t));
if (*result == NULL) {
taosMemoryFree(pCurrent);
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(*result + sizeof(int32_t), pCurrent, currLength);
*(int32_t*)(*result) = currLength + sizeof(int32_t);
}else{
int32_t sizePre = *(int32_t*)(*result);
char* tmp = taosMemoryRealloc(*result, sizePre + currLength);
if (tmp == NULL) {
taosMemoryFree(pCurrent);
taosMemoryFree(*result);
*result = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
}
*result = tmp;
memcpy(*result + sizePre, pCurrent, currLength);
*(int32_t*)(*result) += currLength;
}
taosMemoryFree(pCurrent);
*length = *(int32_t*)(*result);
}
SOperatorInfo* pOptr = NULL;
if (QUERY_NODE_PHYSICAL_PLAN_AGG == operatorType) {
if(ops->fpSet.encodeResultRow){
int32_t length = 0;
SAggOperatorInfo* pAggInfo = ops->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SAggSupporter *pSup = &pAggInfo->aggSup;
ops->fpSet.encodeResultRow(ops, pSup, pInfo, result, &length);
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = encodeOperator(ops->pDownstream[i], result, length);
if(code != TDB_CODE_SUCCESS){
return code;
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == operatorType || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == operatorType) {
}
return 0;
return TDB_CODE_SUCCESS;
}
int32_t decodeExecTaskInfo(SOperatorInfo* ops, char* result){
uint8_t operatorType = ops->operatorType;
int32_t num = 0;
int32_t size = ops->numOfDownstream;
int32_t decodeOperator(SOperatorInfo* ops, char* result, int32_t length){
int32_t code = TDB_CODE_SUCCESS;
if(ops->fpSet.decodeResultRow){
if(result == NULL || length <= 0){
return TSDB_CODE_TSC_INVALID_INPUT;
}
char* data = result + 2 * sizeof(int32_t);
int32_t dataLength = *(int32_t*)(result + sizeof(int32_t));
code = ops->fpSet.decodeResultRow(ops, data, dataLength - sizeof(int32_t));
if(code != TDB_CODE_SUCCESS){
return code;
}
for (int32_t i = 0; i < size; ++i) {
decodeExecTaskInfo(ops->pDownstream[i], result);
int32_t totalLength = *(int32_t*)result;
if(totalLength == dataLength + sizeof(int32_t)) { // the last data
result = NULL;
length = 0;
}else{
result += dataLength;
*(int32_t*)(result) = totalLength - dataLength;
length = totalLength - dataLength;
}
}
SOperatorInfo* pOptr = NULL;
if (QUERY_NODE_PHYSICAL_PLAN_AGG == operatorType) {
if(ops->fpSet.decodeResultRow){
int32_t length = 0;
SAggOperatorInfo* pAggInfo = ops->info;
SOptrBasicInfo* pInfo = &pAggInfo->binfo;
SAggSupporter *pSup = &pAggInfo->aggSup;
taosHashClear(pSup->pResultRowHashTable);
pInfo->resultRowInfo.size = 0;
ops->fpSet.decodeResultRow(ops, pSup, pInfo, result, length);
for (int32_t i = 0; i < ops->numOfDownstream; ++i) {
code = decodeOperator(ops->pDownstream[i], result, length);
if(code != TDB_CODE_SUCCESS){
return code;
}
} else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == operatorType || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION_WINDOW == operatorType) {
} else if (QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW == operatorType) {
}
return 0;
return TDB_CODE_SUCCESS;
}
int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册