提交 a28d714d 编写于 作者: H Haojun Liao

td-1423] fix bugs in group by nchar/binary columns. #3476

上级 562f6ce2
......@@ -57,7 +57,7 @@ typedef struct SWindowResult {
uint16_t numOfRows; // number of rows of current time window
bool closed; // this result status: closed or opened
SResultInfo* resultInfo; // For each result column, there is a resultInfo
TSKEY skey; // start key of current time window
union {STimeWindow win; char* key;}; // start key of current time window
} SWindowResult;
/**
......
......@@ -518,7 +518,7 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
} else {
int32_t slot = curTimeWindowIndex(pWindowResInfo);
SWindowResult* pWindowRes = getWindowResult(pWindowResInfo, slot);
w = GET_TIMEWINDOW(pWindowResInfo, pWindowRes);
w = pWindowRes->win;
}
if (w.skey > ts || w.ekey < ts) {
......@@ -624,7 +624,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
}
// set time window for current result
pWindowRes->skey = win->skey;
pWindowRes->win = (*win);
setWindowResOutputBufInitCtx(pRuntimeEnv, pWindowRes);
return TSDB_CODE_SUCCESS;
......@@ -697,12 +697,12 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
continue;
}
TSKEY ekey = pResult->skey + pWindowResInfo->interval;
TSKEY ekey = pResult->win.ekey;
if ((ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) ||
(pResult->skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
(pResult->win.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) {
closeTimeWindow(pWindowResInfo, i);
} else {
skey = pResult->skey;
skey = pResult->win.skey;
break;
}
}
......@@ -715,7 +715,7 @@ static int32_t doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKe
pWindowResInfo->curIndex = i;
}
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].skey;
pWindowResInfo->prevSKey = pWindowResInfo->pResult[pWindowResInfo->curIndex].win.skey;
// the number of completed slots are larger than the threshold, return current generated results to client.
if (numOfClosed > pWindowResInfo->threshold) {
......@@ -1097,8 +1097,25 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
int64_t v = -1;
// not assign result buffer yet, add new result buffer
char* d = pData;
int16_t len = bytes;
if (type == TSDB_DATA_TYPE_BINARY||type == TSDB_DATA_TYPE_NCHAR) {
d = varDataVal(pData);
len = varDataLen(pData);
} else if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
qError("QInfo:%p group by not supported on double/float/binary/nchar columns, abort", pQInfo);
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
}
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, d, len, true);
if (pWindowRes == NULL) {
return -1;
}
int64_t v = -1;
switch(type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break;
......@@ -1107,12 +1124,14 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break;
}
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes, true);
if (pWindowRes == NULL) {
return -1;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
pWindowRes->key = malloc(varDataTLen(pData));
varDataCopy(pWindowRes->key, pData);
} else {
pWindowRes->win.skey = v;
pWindowRes->win.ekey = v;
}
pWindowRes->skey = v;
assert(pRuntimeEnv->windowResInfo.interval == 0);
if (pWindowRes->pos.pageId == -1) {
......@@ -3004,7 +3023,7 @@ int32_t mergeIntoGroupResultImpl(SQInfo *pQInfo, SArray *pGroup) {
char *b = getPosInResultPage(pRuntimeEnv, PRIMARYKEY_TIMESTAMP_COL_INDEX, pWindowRes, page);
TSKEY ts = GET_INT64_VAL(b);
assert(ts == pWindowRes->skey);
assert(ts == pWindowRes->win.skey);
int64_t num = getNumOfResultWindowRes(pQuery, pWindowRes);
if (num <= 0) {
cs.position[pos] += 1;
......
......@@ -126,11 +126,26 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
assert(num >= 0 && num <= numOfClosed);
int16_t type = pWindowResInfo->type;
char *key = NULL;
int16_t bytes = -1;
for (int32_t i = 0; i < num; ++i) {
SWindowResult *pResult = &pWindowResInfo->pResult[i];
if (pResult->closed) { // remove the window slot from hash table
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->skey, pWindowResInfo->type);
// todo refactor
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
key = varDataVal(pResult->key);
bytes = varDataLen(pResult->key);
} else {
key = (char*) &pResult->win.skey;
bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
}
taosHashRemove(pWindowResInfo->hashList, (const char *)key, bytes);
} else {
break;
}
......@@ -150,15 +165,24 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
}
pWindowResInfo->size = remain;
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
SWindowResult *pResult = &pWindowResInfo->pResult[k];
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->skey,
tDataTypeDesc[pWindowResInfo->type].nSize);
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
key = varDataVal(pResult->key);
bytes = varDataLen(pResult->key);
} else {
key = (char*) &pResult->win.skey;
bytes = tDataTypeDesc[pWindowResInfo->type].nSize;
}
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)key, bytes);
assert(p != NULL);
int32_t v = (*p - num);
assert(v >= 0 && v <= pWindowResInfo->size);
taosHashPut(pWindowResInfo->hashList, (char *)&pResult->skey, tDataTypeDesc[pWindowResInfo->type].nSize,
(char *)&v, sizeof(int32_t));
taosHashPut(pWindowResInfo->hashList, (char *)key, bytes, (char *)&v, sizeof(int32_t));
}
pWindowResInfo->curIndex = -1;
......@@ -207,20 +231,19 @@ void removeRedundantWindow(SWindowResInfo *pWindowResInfo, TSKEY lastKey, int32_
}
// get the result order
int32_t resultOrder = (pWindowResInfo->pResult[0].skey < pWindowResInfo->pResult[1].skey)? 1:-1;
int32_t resultOrder = (pWindowResInfo->pResult[0].win.skey < pWindowResInfo->pResult[1].win.skey)? 1:-1;
if (order != resultOrder) {
return;
}
int32_t i = 0;
if (order == QUERY_ASC_FORWARD_STEP) {
TSKEY ekey = pWindowResInfo->pResult[i].skey + pWindowResInfo->interval;
TSKEY ekey = pWindowResInfo->pResult[i].win.ekey;
while (i < pWindowResInfo->size && (ekey < lastKey)) {
++i;
}
} else if (order == QUERY_DESC_FORWARD_STEP) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].skey > lastKey)) {
while (i < pWindowResInfo->size && (pWindowResInfo->pResult[i].win.skey > lastKey)) {
++i;
}
}
......@@ -258,7 +281,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
pWindowRes->numOfRows = 0;
pWindowRes->pos = (SPosInfo){-1, -1};
pWindowRes->closed = false;
pWindowRes->skey = TSKEY_INITIAL_VAL;
pWindowRes->win = TSWINDOW_INITIALIZER;
}
/**
......@@ -268,7 +291,7 @@ void clearTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindow
*/
void copyTimeWindowResBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
dst->numOfRows = src->numOfRows;
dst->skey = src->skey;
dst->win = src->win;
dst->closed = src->closed;
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutput;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册