提交 da9f05c2 编写于 作者: L liu0x54

[TD-142]modify join process to support binary/nchar tag type

上级 1634038a
......@@ -47,14 +47,14 @@ typedef struct STSList {
typedef struct STSRawBlock {
int32_t vnode;
int64_t tag;
tVariant tag;
TSKEY* ts;
int32_t len;
} STSRawBlock;
typedef struct STSElem {
TSKEY ts;
int64_t tag;
tVariant tag;
int32_t vnode;
} STSElem;
......@@ -66,7 +66,7 @@ typedef struct STSCursor {
} STSCursor;
typedef struct STSBlock {
int64_t tag; // tag value
tVariant tag; // tag value
int32_t numOfElem; // number of elements
int32_t compLen; // size after compressed
int32_t padding; // 0xFFFFFFFF by default, after the payload
......@@ -123,7 +123,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
......@@ -134,7 +134,7 @@ void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
......
......@@ -4335,11 +4335,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
// primary ts must be existed, so no need to check its existance
if (pCtx->order == TSQL_SO_ASC) {
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, input, pCtx->size * TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
} else {
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *d = GET_INPUT_CHAR_INDEX(pCtx, i);
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, d, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, pCtx->tag, d, TSDB_KEYSIZE);
}
}
......@@ -4359,7 +4359,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) {
STSBuf *pTSbuf = pInfo->pTSBuf;
tsBufAppend(pTSbuf, 0, pCtx->tag.i64Key, pData, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, pCtx->tag, pData, TSDB_KEYSIZE);
SET_VAL(pCtx, pCtx->size, 1);
pResInfo->hasResult = DATA_SET_FLAG;
......
......@@ -82,14 +82,14 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
// for debug purpose
tscPrint("%" PRId64 ", tags:%d \t %" PRId64 ", tags:%d", elem1.ts, elem1.tag, elem2.ts, elem2.tag);
#endif
if (elem1.tag < elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem1.ts, elem2.ts))) {
int32_t ret = tVariantCompare(&elem1.tag,&elem2.tag );
if (ret < 0 || (ret == 0 && doCompare(order, elem1.ts, elem2.ts))) {
if (!tsBufNextPos(pSupporter1->pTSBuf)) {
break;
}
numOfInput1++;
} else if (elem1.tag > elem2.tag || (elem1.tag == elem2.tag && doCompare(order, elem2.ts, elem1.ts))) {
} else if (ret > 0 || (ret == 0 && doCompare(order, elem2.ts, elem1.ts))) {
if (!tsBufNextPos(pSupporter2->pTSBuf)) {
break;
}
......@@ -870,6 +870,11 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
return NULL;
}
pTSBuf->block.tag.pz = malloc(MEM_TAG_SIZE); //Need to define the length
if (pTSBuf->block.tag.pz == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->fileSize += getDataStartOffset();
return pTSBuf;
}
......@@ -1005,6 +1010,7 @@ void* tsBufDestory(STSBuf* pTSBuf) {
tfree(pTSBuf->pData);
tfree(pTSBuf->block.payload);
tfree(pTSBuf->block.tag.pz);
fclose(pTSBuf->f);
......@@ -1098,7 +1104,9 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
*
* both side has the compressed length is used to support load data forwards/backwords.
*/
fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
fwrite(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f);
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
......@@ -1107,7 +1115,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
int32_t blockSize = sizeof(pBlock->tag.nType) + sizeof(pBlock->tag.nLen) + pBlock->tag.nLen + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
pTSBuf->fileSize += blockSize;
pTSBuf->tsData.len = 0;
......@@ -1150,11 +1158,13 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
pBlock->compLen = pBlock->padding;
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag.nType) + sizeof(pBlock->tag.nLen) + pBlock->tag.nLen ;
fseek(pTSBuf->f, -offset, SEEK_CUR);
}
fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
fread(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f);
fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
......@@ -1212,9 +1222,10 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
return TSDB_CODE_SUCCESS;
}
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) {
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len) {
STSVnodeBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData;
int32_t tagEqual = 0;
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) {
writeDataToDisk(pTSBuf);
......@@ -1226,15 +1237,17 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData
}
assert(pBlockInfo->info.vnode == vnodeId);
tagEqual = tVariantCompare(&pTSBuf->block.tag,&tag);
if (pTSBuf->block.tag != tag && ptsData->len > 0) {
if (tagEqual !=0 && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first
writeDataToDisk(pTSBuf);
} else {
expandBuffer(ptsData, len);
}
pTSBuf->block.tag = tag;
//pTSBuf->block.tag = tag;
tVariantAssign(&pTSBuf->block.tag,&tag);
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value
......@@ -1315,7 +1328,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
return 0;
}
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) {
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant tag) {
bool decomp = false;
int64_t offset = 0;
......@@ -1334,7 +1347,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
return -1;
}
if (pTSBuf->block.tag == tag) {
if (0 == tVariantCompare(&pTSBuf->block.tag,&tag)) {
return i;
}
}
......@@ -1513,7 +1526,9 @@ STSElem tsBufGetElem(STSBuf* pTSBuf) {
elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
elem1.tag = pBlock->tag;
elem1.tag.nType = pBlock->tag.nType;
elem1.tag.nLen = pBlock->tag.nLen;
elem1.tag.pz = pBlock->tag.pz;
return elem1;
}
......@@ -1644,7 +1659,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
return pTSBuf;
}
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) {
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag) {
STSElem elem = {.vnode = -1};
if (pTSBuf == NULL) {
......@@ -1723,7 +1738,7 @@ void tsBufDisplay(STSBuf* pTSBuf) {
while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts);
printf("%d-%" PRId64 "\n", elem.vnode, elem.ts);
}
pTSBuf->cur.order = old;
......
......@@ -3199,10 +3199,10 @@ static bool validateJoinExprNode(SQueryInfo* pQueryInfo, tSQLExpr* pExpr, SColum
} else if (pLeftIndex->tableIndex == rightIndex.tableIndex) {
invalidSqlErrMsg(pQueryInfo->msg, msg4);
return false;
} else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) {
} /*else if (leftType == TSDB_DATA_TYPE_BINARY || leftType == TSDB_DATA_TYPE_NCHAR) {
invalidSqlErrMsg(pQueryInfo->msg, msg6);
return false;
}
}*/
// table to table/ super table to super table are allowed
if (UTIL_METER_IS_SUPERTABLE(pLeftMeterMeta) != UTIL_METER_IS_SUPERTABLE(pRightMeterMeta)) {
......
......@@ -75,6 +75,8 @@ void tVariantDestroy(tVariant *pV);
void tVariantAssign(tVariant *pDst, const tVariant *pSrc);
int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc);
int32_t tVariantToString(tVariant *pVar, char *dst);
int32_t tVariantDump(tVariant *pVariant, char *payload, char type);
......
......@@ -198,7 +198,7 @@ typedef struct SMeterQueryInfo {
int64_t ekey;
int32_t numOfRes;
int16_t queryRangeSet; // denote if the query range is set, only available for interval query
int64_t tag;
tVariant tag;
STSCursor cur;
int32_t sid; // for retrieve the page id list
......
......@@ -2557,7 +2557,7 @@ static int32_t doTSJoinFilter(SQueryRuntimeEnv *pRuntimeEnv, int32_t offset) {
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
// compare tag first
if (pCtx[0].tag.i64Key != elem.tag) {
if (0 != tVariantCompare(&pCtx[0].tag,&elem.tag)) {
return TS_JOIN_TAG_NOT_EQUALS;
}
......@@ -7461,7 +7461,8 @@ int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, S
// both the master and supplement scan needs to set the correct ts comp start position
if (pRuntimeEnv->pTSBuf != NULL) {
if (pMeterQueryInfo->cur.vnodeIndex == -1) {
pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
//pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
tVariantAssign(&pMeterQueryInfo->tag,&pRuntimeEnv->pCtx[0].tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pMeterQueryInfo->tag);
......
......@@ -500,7 +500,8 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vnodeIndex == -1) {
int64_t tag = pRuntimeEnv->pCtx[0].tag.i64Key;
tVariant tag;
tVariantAssign(&tag,&pRuntimeEnv->pCtx[0].tag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag);
// failed to find data with the specified tag value
......
......@@ -184,6 +184,34 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
memcpy(pDst->pz, pSrc->pz, len);
}
}
/* compare two tVariant, if same, return 0; else return nonezero */
int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return 1;
if (pSrc->nType != pDst->nType) return 1;
switch (pSrc->nType) {
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
if (pSrc->i64Key > pDst->i64Key){
return 1;
}else if (pSrc->i64Key < pDst->i64Key) {
return -1;
}else {
return 0;
}
case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR:
return strncasecmp(pSrc->pz,pDst->pz,pSrc->nLen);
default:
return 1;
}
}
int32_t tVariantToString(tVariant *pVar, char *dst) {
if (pVar == NULL || dst == NULL) return 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册