提交 66802544 编写于 作者: H hjxilinx

[Tbase-1279] support binary tag join.

上级 da9f05c2
......@@ -33,7 +33,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
#define MEM_BUF_SIZE (1<<20)
#define MEM_BUF_SIZE (1u<<20)
#define TS_COMP_BLOCK_PADDING 0xFFFFFFFF
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512
......@@ -123,7 +123,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
......@@ -134,7 +134,7 @@ void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
......
......@@ -4335,11 +4335,11 @@ static void ts_comp_function(SQLFunctionCtx *pCtx) {
// primary ts must be existed, so no need to check its existance
if (pCtx->order == TSQL_SO_ASC) {
tsBufAppend(pTSbuf, 0, pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, &pCtx->tag, input, pCtx->size * TSDB_KEYSIZE);
} else {
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
char *d = GET_INPUT_CHAR_INDEX(pCtx, i);
tsBufAppend(pTSbuf, 0, pCtx->tag, d, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, &pCtx->tag, d, TSDB_KEYSIZE);
}
}
......@@ -4359,7 +4359,7 @@ static void ts_comp_function_f(SQLFunctionCtx *pCtx, int32_t index) {
STSBuf *pTSbuf = pInfo->pTSBuf;
tsBufAppend(pTSbuf, 0, pCtx->tag, pData, TSDB_KEYSIZE);
tsBufAppend(pTSbuf, 0, &pCtx->tag, pData, TSDB_KEYSIZE);
SET_VAL(pCtx, pCtx->size, 1);
pResInfo->hasResult = DATA_SET_FLAG;
......
......@@ -109,8 +109,8 @@ static int64_t doTSBlockIntersect(SSqlObj* pSql, SJoinSubquerySupporter* pSuppor
*et = elem1.ts;
}
tsBufAppend(output1, elem1.vnode, elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
tsBufAppend(output1, elem1.vnode, &elem1.tag, (const char*)&elem1.ts, sizeof(elem1.ts));
tsBufAppend(output2, elem2.vnode, &elem2.tag, (const char*)&elem2.ts, sizeof(elem2.ts));
} else {
pLimit->offset -= 1;
}
......@@ -869,12 +869,7 @@ static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->block.tag.pz = malloc(MEM_TAG_SIZE); //Need to define the length
if (pTSBuf->block.tag.pz == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->fileSize += getDataStartOffset();
return pTSBuf;
}
......@@ -1010,7 +1005,8 @@ void* tsBufDestory(STSBuf* pTSBuf) {
tfree(pTSBuf->pData);
tfree(pTSBuf->block.payload);
tfree(pTSBuf->block.tag.pz);
tVariantDestroy(&pTSBuf->block.tag);
fclose(pTSBuf->f);
......@@ -1094,8 +1090,7 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
UNUSED(r);
/*int64_t r =*/ fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
/*
* format for output data:
......@@ -1105,8 +1100,14 @@ static void writeDataToDisk(STSBuf* pTSBuf) {
* both side has the compressed length is used to support load data forwards/backwords.
*/
fwrite(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
fwrite(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f);
fwrite(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
fwrite(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
} else {
fwrite(&pBlock->tag.i64Key, sizeof(int64_t), 1, pTSBuf->f);
}
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
......@@ -1145,10 +1146,13 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
STSBlock* pBlock = &pTSBuf->block;
// clear the memory buffer
tVariant t = pBlock->tag;
void* tmp = pBlock->payload;
memset(pBlock, 0, sizeof(STSBlock));
pBlock->payload = tmp;
pBlock->tag = t;
if (order == TSQL_SO_DESC) {
/*
* set the right position for the reversed traverse, the reversed traverse is started from
......@@ -1164,7 +1168,20 @@ STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
fread(&pBlock->tag.nType, sizeof(pBlock->tag.nType), 1, pTSBuf->f);
fread(&pBlock->tag.nLen, sizeof(pBlock->tag.nLen), 1, pTSBuf->f);
fread(pBlock->tag.pz,(size_t)pBlock->tag.nLen,1,pTSBuf->f);
// NOTE: mix types tags are not supported
if (pBlock->tag.nType == TSDB_DATA_TYPE_BINARY || pBlock->tag.nType == TSDB_DATA_TYPE_NCHAR) {
char* tp = realloc(pBlock->tag.pz, pBlock->tag.nLen + 1);
assert(tp != NULL);
memset(tp, 0, pBlock->tag.nLen + 1);
pBlock->tag.pz = tp;
fread(pBlock->tag.pz, (size_t)pBlock->tag.nLen, 1, pTSBuf->f);
} else {
fread(&pBlock->tag.i64Key, pBlock->tag.nLen, 1, pTSBuf->f);
}
fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
......@@ -1222,10 +1239,11 @@ static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
return TSDB_CODE_SUCCESS;
}
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pData, int32_t len) {
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag, const char* pData, int32_t len) {
STSVnodeBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData;
int32_t tagEqual = 0;
STSList* ptsData = &pTSBuf->tsData;
int32_t tagEqual = 0;
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) {
writeDataToDisk(pTSBuf);
......@@ -1237,17 +1255,18 @@ void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag, const char* pDat
}
assert(pBlockInfo->info.vnode == vnodeId);
tagEqual = tVariantCompare(&pTSBuf->block.tag,&tag);
tagEqual = tVariantCompare(&pTSBuf->block.tag, tag);
if (tagEqual !=0 && ptsData->len > 0) {
if (tagEqual != 0 && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first
writeDataToDisk(pTSBuf);
} else {
expandBuffer(ptsData, len);
}
//pTSBuf->block.tag = tag;
tVariantAssign(&pTSBuf->block.tag,&tag);
tVariantDestroy(&pTSBuf->block.tag);
tVariantAssign(&pTSBuf->block.tag, tag);
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value
......@@ -1328,7 +1347,7 @@ static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int
return 0;
}
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant tag) {
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, tVariant* tag) {
bool decomp = false;
int64_t offset = 0;
......@@ -1347,7 +1366,7 @@ static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo
return -1;
}
if (0 == tVariantCompare(&pTSBuf->block.tag,&tag)) {
if (0 == tVariantCompare(&pTSBuf->block.tag, tag)) {
return i;
}
}
......@@ -1659,7 +1678,7 @@ STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_
return pTSBuf;
}
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant tag) {
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, tVariant* tag) {
STSElem elem = {.vnode = -1};
if (pTSBuf == NULL) {
......
......@@ -75,7 +75,7 @@ void tVariantDestroy(tVariant *pV);
void tVariantAssign(tVariant *pDst, const tVariant *pSrc);
int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc);
int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc);
int32_t tVariantToString(tVariant *pVar, char *dst);
......
......@@ -7464,7 +7464,7 @@ int32_t setAdditionalInfo(STableQuerySupportObj *pSupporter, int32_t meterIdx, S
//pMeterQueryInfo->tag = pRuntimeEnv->pCtx[0].tag.i64Key;
tVariantAssign(&pMeterQueryInfo->tag,&pRuntimeEnv->pCtx[0].tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, pMeterQueryInfo->tag);
tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &pMeterQueryInfo->tag);
// keep the cursor info of current meter
pMeterQueryInfo->cur = pRuntimeEnv->pTSBuf->cur;
......
......@@ -500,9 +500,9 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
if (pRuntimeEnv->pTSBuf != NULL) {
if (pRuntimeEnv->cur.vnodeIndex == -1) {
tVariant tag;
tVariantAssign(&tag,&pRuntimeEnv->pCtx[0].tag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, tag);
tVariant tag = {0};
tVariantAssign(&tag, &pRuntimeEnv->pCtx[0].tag);
STSElem elem = tsBufGetElemStartPos(pRuntimeEnv->pTSBuf, 0, &tag);
// failed to find data with the specified tag value
if (elem.vnode < 0) {
......
......@@ -114,27 +114,33 @@ void tVariantCreateFromBinary(tVariant *pVar, char *pz, uint32_t len, uint32_t t
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: {
pVar->i64Key = GET_INT8_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
pVar->i64Key = GET_INT16_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_INT: {
pVar->i64Key = GET_INT32_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
pVar->i64Key = GET_INT64_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
pVar->dKey = GET_DOUBLE_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_FLOAT: {
pVar->dKey = GET_FLOAT_VAL(pz);
pVar->nLen = sizeof(int64_t);
break;
}
case TSDB_DATA_TYPE_NCHAR: { // here we get the nchar length from raw binary bits length
......@@ -181,11 +187,13 @@ void tVariantAssign(tVariant *pDst, const tVariant *pSrc) {
}
pDst->pz = calloc(1, len);
printf("==============alloc assign:%p", pDst->pz);
memcpy(pDst->pz, pSrc->pz, len);
}
}
/* compare two tVariant, if same, return 0; else return nonezero */
int32_t tVariantCompare(tVariant *pDst, const tVariant *pSrc) {
int32_t tVariantCompare(const tVariant *pDst, const tVariant *pSrc) {
if (pSrc == NULL || pDst == NULL) return 1;
if (pSrc->nType != pDst->nType) return 1;
switch (pSrc->nType) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册