提交 6595b2bc 编写于 作者: wmmhello's avatar wmmhello

fix stddev group by json->key error

上级 1dfd6854
...@@ -258,7 +258,7 @@ void tscColumnListDestroy(SArray* pColList); ...@@ -258,7 +258,7 @@ void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid); void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src); void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar); void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson);
void tscDequoteAndTrimToken(SStrToken* pToken); void tscDequoteAndTrimToken(SStrToken* pToken);
void tscRmEscapeAndTrimToken(SStrToken* pToken); void tscRmEscapeAndTrimToken(SStrToken* pToken);
......
...@@ -453,7 +453,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo); ...@@ -453,7 +453,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo); int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted); void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar); void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent); void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam); void destroyTableNameList(SInsertStatementParam* pInsertParam);
......
...@@ -1840,7 +1840,9 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) { ...@@ -1840,7 +1840,9 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
uint64_t localQueryId = pSql->self; uint64_t localQueryId = pSql->self;
qTableQuery(pQueryInfo->pQInfo, &localQueryId); qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo, pSql->self, true); bool convertJson = true;
if (tscMultiRoundQuery(pQueryInfo, 0)) convertJson = false;
convertQueryResult(pRes, pQueryInfo, pSql->self, true, convertJson);
code = pRes->code; code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) { if (pRes->code == TSDB_CODE_SUCCESS) {
......
...@@ -3708,7 +3708,9 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql) { ...@@ -3708,7 +3708,9 @@ TAOS_ROW doSetResultRowData(SSqlObj *pSql) {
int32_t type = pInfo->field.type; int32_t type = pInfo->field.type;
int32_t bytes = pInfo->field.bytes; int32_t bytes = pInfo->field.bytes;
if (!IS_VAR_DATA_TYPE(type) && type != TSDB_DATA_TYPE_JSON) { if (tscMultiRoundQuery(pQueryInfo, 0) && pQueryInfo->round == 0 && type == TSDB_DATA_TYPE_JSON){ // for json tag compare in the second round of stddev
pRes->tsrow[j] = pRes->urow[i];
}else if (!IS_VAR_DATA_TYPE(type) && type != TSDB_DATA_TYPE_JSON) {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i]; pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : pRes->urow[i];
} else { } else {
pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]); pRes->tsrow[j] = isNull(pRes->urow[i], type) ? NULL : varDataVal(pRes->urow[i]);
......
...@@ -712,7 +712,7 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) { ...@@ -712,7 +712,7 @@ int32_t tscCreateResPointerInfo(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar) { static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bool convertNchar, bool convertJson) {
// generated the user-defined column result // generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) { if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.colInfo.flag)) {
if (pInfo->pExpr->base.param[0].nType == TSDB_DATA_TYPE_NULL) { if (pInfo->pExpr->base.param[0].nType == TSDB_DATA_TYPE_NULL) {
...@@ -764,57 +764,61 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo ...@@ -764,57 +764,61 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
} }
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows); memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else if (pInfo->field.type == TSDB_DATA_TYPE_JSON) { }else if (pInfo->field.type == TSDB_DATA_TYPE_JSON) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol if (convertJson){
char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows); // convert unicode to native code in a temporary buffer extra one byte for terminated symbol
if (buffer == NULL) return; char* buffer = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
pRes->buffer[i] = buffer; if (buffer == NULL) return;
// string terminated char for binary data pRes->buffer[i] = buffer;
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows); // string terminated char for binary data
memset(pRes->buffer[i], 0, pInfo->field.bytes * pRes->numOfRows);
char* p = pRes->urow[i];
for (int32_t k = 0; k < pRes->numOfRows; ++k) { char* p = pRes->urow[i];
char* dst = pRes->buffer[i] + k * pInfo->field.bytes; for (int32_t k = 0; k < pRes->numOfRows; ++k) {
char type = *p; char* dst = pRes->buffer[i] + k * pInfo->field.bytes;
char* realData = p + CHAR_BYTES; char type = *p;
if (type == TSDB_DATA_TYPE_JSON && isNull(realData, TSDB_DATA_TYPE_JSON)) { char* realData = p + CHAR_BYTES;
memcpy(dst, realData, varDataTLen(realData)); if (type == TSDB_DATA_TYPE_JSON && isNull(realData, TSDB_DATA_TYPE_JSON)) {
} else if (type == TSDB_DATA_TYPE_BINARY) { memcpy(dst, realData, varDataTLen(realData));
assert(*(uint32_t*)varDataVal(realData) == TSDB_DATA_JSON_null); // json null value } else if (type == TSDB_DATA_TYPE_BINARY) {
assert(varDataLen(realData) == INT_BYTES); assert(*(uint32_t*)varDataVal(realData) == TSDB_DATA_JSON_null); // json null value
sprintf(varDataVal(dst), "%s", "null"); assert(varDataLen(realData) == INT_BYTES);
varDataSetLen(dst, strlen(varDataVal(dst))); sprintf(varDataVal(dst), "%s", "null");
}else if (type == TSDB_DATA_TYPE_JSON) { varDataSetLen(dst, strlen(varDataVal(dst)));
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), varDataVal(dst)); }else if (type == TSDB_DATA_TYPE_JSON) {
varDataSetLen(dst, length); int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), varDataVal(dst));
if (length == 0) { varDataSetLen(dst, length);
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); if (length == 0) {
} tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}else if (type == TSDB_DATA_TYPE_NCHAR) { // value -> "value" }
*(char*)varDataVal(dst) = '\"'; }else if (type == TSDB_DATA_TYPE_NCHAR) { // value -> "value"
int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), POINTER_SHIFT(varDataVal(dst), CHAR_BYTES)); *(char*)varDataVal(dst) = '\"';
*(char*)(POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES)) = '\"'; int32_t length = taosUcs4ToMbs(varDataVal(realData), varDataLen(realData), POINTER_SHIFT(varDataVal(dst), CHAR_BYTES));
varDataSetLen(dst, length + CHAR_BYTES*2); *(char*)(POINTER_SHIFT(varDataVal(dst), length + CHAR_BYTES)) = '\"';
if (length == 0) { varDataSetLen(dst, length + CHAR_BYTES*2);
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p); if (length == 0) {
tscError("charset:%s to %s. val:%s convert failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, (char*)p);
}
}else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(realData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char *)realData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else {
assert(0);
} }
}else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(realData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
}else if (type == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char *)realData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
}else {
assert(0);
}
p += pInfo->field.bytes; p += pInfo->field.bytes;
}
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
}else{
// if convertJson is false, json data as raw data used for stddev for the second round
} }
memcpy(pRes->urow[i], pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
} }
if (convertNchar) { if (convertNchar) {
...@@ -822,6 +826,10 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo ...@@ -822,6 +826,10 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
} }
} }
void tscJson2String(char *src, char* dst){
}
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) { void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
assert(pRes->numOfCols > 0); assert(pRes->numOfCols > 0);
if (pRes->numOfRows == 0) { if (pRes->numOfRows == 0) {
...@@ -835,11 +843,11 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) { ...@@ -835,11 +843,11 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo, bool converted) {
pRes->length[i] = pInfo->field.bytes; pRes->length[i] = pInfo->field.bytes;
offset += pInfo->field.bytes; offset += pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, converted ? false : true); setResRawPtrImpl(pRes, pInfo, i, converted ? false : true, true);
} }
} }
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar) { void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar, bool convertJson) {
assert(pRes->numOfCols > 0); assert(pRes->numOfCols > 0);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
...@@ -850,7 +858,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc ...@@ -850,7 +858,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
pRes->urow[i] = pColData->pData; pRes->urow[i] = pColData->pData;
pRes->length[i] = pInfo->field.bytes; pRes->length[i] = pInfo->field.bytes;
setResRawPtrImpl(pRes, pInfo, i, convertNchar); setResRawPtrImpl(pRes, pInfo, i, convertNchar, convertJson);
/* /*
// generated the user-defined column result // generated the user-defined column result
if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.ColName.flag)) { if (pInfo->pExpr->pExpr == NULL && TSDB_COL_IS_UD_COL(pInfo->pExpr->base.ColName.flag)) {
...@@ -1306,14 +1314,14 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp ...@@ -1306,14 +1314,14 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
return pOperator; return pOperator;
} }
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar) { void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar, bool convertJson) {
// set the correct result // set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf; SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0; pRes->numOfRows = (p != NULL)? p->info.rows: 0;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) { if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo); tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar); tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar, convertJson);
} }
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows); tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
...@@ -1472,7 +1480,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue ...@@ -1472,7 +1480,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
uint64_t qId = pSql->self; uint64_t qId = pSql->self;
qTableQuery(px->pQInfo, &qId); qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px, pSql->self, false); convertQueryResult(pOutput, px, pSql->self, false, true);
} }
static void tscDestroyResPointerInfo(SSqlRes* pRes) { static void tscDestroyResPointerInfo(SSqlRes* pRes) {
......
...@@ -1455,34 +1455,34 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc ...@@ -1455,34 +1455,34 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
} }
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
int32_t numOfOutput = pOperatorInfo->numOfOutput; int32_t numOfOutput = pOperatorInfo->numOfOutput;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order); int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr); bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
int32_t prevIndex = pResultRowInfo->curPos; int32_t prevIndex = pResultRowInfo->curPos;
TSKEY* tsCols = NULL; TSKEY* tsCols = NULL;
if (pSDataBlock->pDataBlock != NULL) { if (pSDataBlock->pDataBlock != NULL) {
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
tsCols = (int64_t*) pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
assert(tsCols[0] == pSDataBlock->info.window.skey && assert(tsCols[0] == pSDataBlock->info.window.skey &&
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
} }
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1);
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows); TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr); STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv); bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, int32_t ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult,
numOfOutput, pInfo->rowCellInfoOffset); tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS || pResult == NULL) { if (ret != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -1498,31 +1498,32 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1498,31 +1498,32 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already. for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
SResultRow* pRes = getResultRow(pResultRowInfo, j); SResultRow* pRes = getResultRow(pResultRowInfo, j);
if (pRes->closed) { if (pRes->closed) {
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP)); assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) &&
resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
continue; continue;
} }
STimeWindow w = pRes->win; STimeWindow w = pRes->win;
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult, ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &w, masterScan, &pResult,
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP)); assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1, doTimeWindowInterpolation(pOperatorInfo, pInfo, pSDataBlock->pDataBlock, *(TSKEY*)pRuntimeEnv->prevRow[0], -1,
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP); tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP); setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP); setNotInterpoWindowKey(pInfo->pCtx, pQueryAttr->numOfOutput, RESULT_ROW_START_INTERP);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput); doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &w, startPos, 0, tsCols, pSDataBlock->info.rows, numOfOutput);
} }
// restore current time window // restore current time window
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId, pInfo->pCtx, ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult,
numOfOutput, pInfo->rowCellInfoOffset); tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
...@@ -1541,31 +1542,30 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul ...@@ -1541,31 +1542,30 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
} }
// null data, failed to allocate more memory buffer // null data, failed to allocate more memory buffer
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan, &pResult, tableGroupId, int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &nextWin, masterScan,
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset); &pResult, tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
if (code != TSDB_CODE_SUCCESS || pResult == NULL) { if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY); longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
ekey = reviseWindowEkey(pQueryAttr, &nextWin); ekey = reviseWindowEkey(pQueryAttr, &nextWin);
forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true); forwardStep =
getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
// window start(end) key interpolation // window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep); doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &nextWin, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput); doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &nextWin, startPos, forwardStep, tsCols, pSDataBlock->info.rows,
numOfOutput);
} }
if (pQueryAttr->timeWindowInterpo) { if (pQueryAttr->timeWindowInterpo) {
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0; int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex); saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
} }
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey); updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
} }
static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current; STableQueryInfo* item = pRuntimeEnv->current;
...@@ -3250,7 +3250,7 @@ static void doSetTagValueInParam(void* pTable, char* param, int32_t paramLen, in ...@@ -3250,7 +3250,7 @@ static void doSetTagValueInParam(void* pTable, char* param, int32_t paramLen, in
}else{ }else{
getJsonTagValueAll(val, jsonVal, TSDB_MAX_JSON_TAGS_LEN); getJsonTagValueAll(val, jsonVal, TSDB_MAX_JSON_TAGS_LEN);
} }
tVariantCreateFromBinary(tag, jsonVal, CHAR_BYTES + varDataTLen(POINTER_SHIFT(jsonVal,CHAR_BYTES)), type); tVariantCreateFromBinary(tag, jsonVal, bytes, type);
} else { } else {
tVariantCreateFromBinary(tag, val, bytes, type); tVariantCreateFromBinary(tag, val, bytes, type);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册