提交 0cd2043c 编写于 作者: H hjxilinx

refactor codes.

上级 b9888eaf
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCCACHE_H
#define TDENGINE_TSCCACHE_H
#ifdef __cplusplus
extern "C" {
#endif
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void taosCloseConnCache(void *handle);
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCACHE_H
......@@ -24,7 +24,6 @@ extern "C" {
#include "tsclient.h"
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
......@@ -33,121 +32,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
#define MEM_BUF_SIZE (1<<20)
#define TS_COMP_BLOCK_PADDING 0xFFFFFFFF
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512
typedef struct STSList {
char* rawBuf;
int32_t allocSize;
int32_t threshold;
int32_t len;
} STSList;
typedef struct STSRawBlock {
int32_t vnode;
int64_t tag;
TSKEY* ts;
int32_t len;
} STSRawBlock;
typedef struct STSElem {
TSKEY ts;
int64_t tag;
int32_t vnode;
} STSElem;
typedef struct STSCursor {
int32_t vnodeIndex;
int32_t blockIndex;
int32_t tsIndex;
int32_t order;
} STSCursor;
typedef struct STSBlock {
int64_t tag; // tag value
int32_t numOfElem; // number of elements
int32_t compLen; // size after compressed
int32_t padding; // 0xFFFFFFFF by default, after the payload
char* payload; // actual data that is compressed
} STSBlock;
typedef struct STSVnodeBlockInfo {
int32_t vnode;
/*
* The size of buffer file is not expected to be greater than 2G,
* and the offset of int32_t type is enough
*/
int32_t offset;
int32_t numOfBlocks;
int32_t compLen;
} STSVnodeBlockInfo;
typedef struct STSVnodeBlockInfoEx {
STSVnodeBlockInfo info;
int32_t len; // length before compress
} STSVnodeBlockInfoEx;
typedef struct STSBuf {
FILE* f;
char path[PATH_MAX];
uint32_t fileSize;
STSVnodeBlockInfoEx* pData;
int32_t numOfAlloc;
int32_t numOfVnodes;
char* assistBuf;
int32_t bufSize;
STSBlock block;
STSList tsData; // uncompressed raw ts data
uint64_t numOfTotal;
bool autoDelete;
int32_t tsOrder; // order of timestamp in ts comp buffer
STSCursor cur;
} STSBuf;
typedef struct STSBufFileHeader {
uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file
uint32_t tsOrder; // timestamp order in current file
} STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
STSBuf* tsBufClone(STSBuf* pTSBuf);
/**
* display all data in comp block file, for debug purpose only
* @param pTSBuf
*/
void tsBufDisplay(STSBuf* pTSBuf);
#ifdef __cplusplus
}
#endif
......
......@@ -25,13 +25,13 @@ extern "C" {
#include "taosmsg.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tscCache.h"
#include "taosdef.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "trpc.h"
#include "qsqltype.h"
#include "qsqlparser.h"
#include "qtsbuf.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
......
......@@ -14,14 +14,15 @@
*/
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "qhistogram.h"
#include "qinterpolation.h"
#include "qpercentile.h"
#include "qsyntaxtreefunction.h"
#include "qtsbuf.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "tlog.h"
#include "tscJoinProcess.h"
#include "tscompression.h"
......
......@@ -15,12 +15,8 @@
#include "os.h"
#include "tscJoinProcess.h"
#include "tcache.h"
#include "tscUtil.h"
#include "tsclient.h"
#include "tscompression.h"
#include "ttime.h"
#include "tutil.h"
#include "qtsbuf.h"
static void freeSubqueryObj(SSqlObj* pSql);
......@@ -813,919 +809,4 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
}
}
}
}
static int32_t getDataStartOffset() {
return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo);
}
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) {
if (offset < 0 || offset >= getDataStartOffset()) {
return -1;
}
if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) {
return -1;
}
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f);
return 0;
}
// update prev vnode length info in file
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) {
int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo);
doUpdateVnodeInfo(pTSBuf, offset, pBlockInfo);
}
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
const int32_t INITIAL_VNODEINFO_SIZE = 4;
pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE;
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx));
if (pTSBuf->pData == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->tsData.rawBuf == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->bufSize = MEM_BUF_SIZE;
pTSBuf->tsData.threshold = MEM_BUF_SIZE;
pTSBuf->tsData.allocSize = MEM_BUF_SIZE;
pTSBuf->assistBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->assistBuf == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->block.payload = malloc(MEM_BUF_SIZE);
if (pTSBuf->block.payload == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->fileSize += getDataStartOffset();
return pTSBuf;
}
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
/**
* todo error handling
* support auto closeable tmp file
* @param path
* @return
*/
STSBuf* tsBufCreate(bool autoDelete) {
STSBuf* pTSBuf = calloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) {
return NULL;
}
getTmpfilePath("join", pTSBuf->path);
pTSBuf->f = fopen(pTSBuf->path, "w+");
if (pTSBuf->f == NULL) {
free(pTSBuf);
return NULL;
}
if (NULL == allocResForTSBuf(pTSBuf)) {
return NULL;
}
// update the header info
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSQL_SO_ASC};
STSBufUpdateHeader(pTSBuf, &header);
tsBufResetPos(pTSBuf);
pTSBuf->cur.order = TSQL_SO_ASC;
pTSBuf->autoDelete = autoDelete;
pTSBuf->tsOrder = -1;
return pTSBuf;
}
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
STSBuf* pTSBuf = calloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) {
return NULL;
}
strncpy(pTSBuf->path, path, PATH_MAX);
pTSBuf->f = fopen(pTSBuf->path, "r+");
if (pTSBuf->f == NULL) {
free(pTSBuf);
return NULL;
}
if (allocResForTSBuf(pTSBuf) == NULL) {
return NULL;
}
// validate the file magic number
STSBufFileHeader header = {0};
fseek(pTSBuf->f, 0, SEEK_SET);
fread(&header, 1, sizeof(header), pTSBuf->f);
// invalid file
if (header.magic != TS_COMP_FILE_MAGIC) {
return NULL;
}
if (header.numOfVnode > pTSBuf->numOfAlloc) {
pTSBuf->numOfAlloc = header.numOfVnode;
STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc);
if (tmp == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->pData = tmp;
}
pTSBuf->numOfVnodes = header.numOfVnode;
// check the ts order
pTSBuf->tsOrder = header.tsOrder;
if (pTSBuf->tsOrder != TSQL_SO_ASC && pTSBuf->tsOrder != TSQL_SO_DESC) {
tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
tsBufDestory(pTSBuf);
return NULL;
}
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
//int64_t pos = ftell(pTSBuf->f); //pos not used
fread(buf, infoSize, 1, pTSBuf->f);
// the length value for each vnode is not kept in file, so does not set the length value
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) {
STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i];
memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo));
}
free(buf);
fseek(pTSBuf->f, 0, SEEK_END);
struct stat fileStat;
fstat(fileno(pTSBuf->f), &fileStat);
pTSBuf->fileSize = (uint32_t)fileStat.st_size;
tsBufResetPos(pTSBuf);
// ascending by default
pTSBuf->cur.order = TSQL_SO_ASC;
pTSBuf->autoDelete = autoDelete;
tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete);
return pTSBuf;
}
void* tsBufDestory(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return NULL;
}
tfree(pTSBuf->assistBuf);
tfree(pTSBuf->tsData.rawBuf);
tfree(pTSBuf->pData);
tfree(pTSBuf->block.payload);
fclose(pTSBuf->f);
if (pTSBuf->autoDelete) {
tscTrace("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
unlink(pTSBuf->path);
} else {
tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
}
free(pTSBuf);
return NULL;
}
static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) {
int32_t last = pTSBuf->numOfVnodes - 1;
assert(last >= 0);
return &pTSBuf->pData[last];
}
static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) {
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
assert(newSize > pTSBuf->numOfAlloc);
STSVnodeBlockInfoEx* tmp = (STSVnodeBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize);
if (tmp == NULL) {
return NULL;
}
pTSBuf->pData = tmp;
pTSBuf->numOfAlloc = newSize;
memset(&pTSBuf->pData[pTSBuf->numOfVnodes], 0, sizeof(STSVnodeBlockInfoEx) * (newSize - pTSBuf->numOfVnodes));
}
if (pTSBuf->numOfVnodes > 0) {
STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info);
}
// set initial value for vnode block
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes].info;
pBlockInfo->vnode = vnodeId;
pBlockInfo->offset = pTSBuf->fileSize;
assert(pBlockInfo->offset >= getDataStartOffset());
// update vnode info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes, pBlockInfo);
// add one vnode info
pTSBuf->numOfVnodes += 1;
// update the header info
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
return tsBufGetLastVnodeInfo(pTSBuf);
}
static void shrinkBuffer(STSList* ptsData) {
// shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
if (ptsData->allocSize >= ptsData->threshold * 2) {
ptsData->rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE);
ptsData->allocSize = MEM_BUF_SIZE;
}
}
static void writeDataToDisk(STSBuf* pTSBuf) {
if (pTSBuf->tsData.len == 0) {
return;
}
STSBlock* pBlock = &pTSBuf->block;
pBlock->numOfElem = pTSBuf->tsData.len / TSDB_KEYSIZE;
pBlock->compLen =
tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
UNUSED(r);
/*
* format for output data:
* 1. tags, number of ts, size after compressed, payload, size after compressed
* 2. tags, number of ts, size after compressed, payload, size after compressed
*
* both side has the compressed length is used to support load data forwards/backwords.
*/
fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
pTSBuf->fileSize += blockSize;
pTSBuf->tsData.len = 0;
STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
pVnodeBlockInfoEx->info.compLen += blockSize;
pVnodeBlockInfoEx->info.numOfBlocks += 1;
shrinkBuffer(&pTSBuf->tsData);
}
static void expandBuffer(STSList* ptsData, int32_t inputSize) {
if (ptsData->allocSize - ptsData->len < inputSize) {
int32_t newSize = inputSize + ptsData->len;
char* tmp = realloc(ptsData->rawBuf, (size_t)newSize);
if (tmp == NULL) {
// todo
}
ptsData->rawBuf = tmp;
ptsData->allocSize = newSize;
}
}
STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
STSBlock* pBlock = &pTSBuf->block;
// clear the memory buffer
void* tmp = pBlock->payload;
memset(pBlock, 0, sizeof(STSBlock));
pBlock->payload = tmp;
if (order == TSQL_SO_DESC) {
/*
* set the right position for the reversed traverse, the reversed traverse is started from
* the end of each comp data block
*/
fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR);
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
pBlock->compLen = pBlock->padding;
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
fseek(pTSBuf->f, -offset, SEEK_CUR);
}
fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
if (decomp) {
pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
}
// read the comp length at the length of comp block
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
// for backwards traverse, set the start position at the end of previous block
if (order == TSQL_SO_DESC) {
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
int64_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
UNUSED(r);
}
return pBlock;
}
// set the order of ts buffer if the ts order has not been set yet
static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->tsOrder == -1) {
if (ptsData->len > 0) {
TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE);
if (lastKey > *(TSKEY*)pData) {
pTSBuf->tsOrder = TSQL_SO_DESC;
} else {
pTSBuf->tsOrder = TSQL_SO_ASC;
}
} else if (len > TSDB_KEYSIZE) {
// no data in current vnode, more than one ts is added, check the orders
TSKEY k1 = *(TSKEY*)(pData);
TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE);
if (k1 < k2) {
pTSBuf->tsOrder = TSQL_SO_ASC;
} else if (k1 > k2) {
pTSBuf->tsOrder = TSQL_SO_DESC;
} else {
// todo handle error
}
}
} else {
// todo the timestamp order is set, check the asc/desc order of appended data
}
return TSDB_CODE_SUCCESS;
}
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) {
STSVnodeBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) {
writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData);
pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId);
} else {
pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf);
}
assert(pBlockInfo->info.vnode == vnodeId);
if (pTSBuf->block.tag != tag && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first
writeDataToDisk(pTSBuf);
} else {
expandBuffer(ptsData, len);
}
pTSBuf->block.tag = tag;
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value
setCheckTSOrder(pTSBuf, pData, len);
ptsData->len += len;
pBlockInfo->len += len;
pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
// the size of raw data exceeds the size of the default prepared buffer, so
// during getBufBlock, the output buffer needs to be large enough.
if (ptsData->len >= ptsData->threshold) {
writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData);
}
tsBufResetPos(pTSBuf);
}
void tsBufFlush(STSBuf* pTSBuf) {
if (pTSBuf->tsData.len <= 0) {
return;
}
writeDataToDisk(pTSBuf);
shrinkBuffer(&pTSBuf->tsData);
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info);
// save the ts order into header
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
fsync(fileno(pTSBuf->f));
}
static int32_t tsBufFindVnodeIndexFromId(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t numOfVnodes, int32_t vnodeId) {
int32_t j = -1;
for (int32_t i = 0; i < numOfVnodes; ++i) {
if (pVnodeInfoEx[i].info.vnode == vnodeId) {
j = i;
break;
}
}
return j;
}
// todo opt performance by cache blocks info
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int32_t blockIndex) {
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
return -1;
}
// sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
int32_t i = 0;
bool decomp = false;
while ((i++) <= blockIndex) {
if (readDataFromDisk(pTSBuf, TSQL_SO_ASC, decomp) == NULL) {
return -1;
}
}
// set the file position to be the end of previous comp block
if (pTSBuf->cur.order == TSQL_SO_DESC) {
STSBlock* pBlock = &pTSBuf->block;
int32_t compBlockSize =
pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
}
return 0;
}
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) {
bool decomp = false;
int64_t offset = 0;
if (pTSBuf->cur.order == TSQL_SO_ASC) {
offset = pBlockInfo->offset;
} else { // reversed traverse starts from the end of block
offset = pBlockInfo->offset + pBlockInfo->compLen;
}
if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) {
return -1;
}
for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) {
if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) {
return -1;
}
if (pTSBuf->block.tag == tag) {
return i;
}
}
return -1;
}
static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex) {
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[vnodeIndex].info;
if (pBlockInfo->numOfBlocks <= blockIndex) {
assert(false);
}
STSCursor* pCur = &pTSBuf->cur;
if (pCur->vnodeIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSQL_SO_ASC) ||
(pCur->blockIndex >= blockIndex && pCur->order == TSQL_SO_DESC))) {
int32_t i = 0;
bool decomp = false;
int32_t step = abs(blockIndex - pCur->blockIndex);
while ((++i) <= step) {
if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) {
return;
}
}
} else {
if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) {
assert(false);
}
}
STSBlock* pBlock = &pTSBuf->block;
size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
/*
* In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
*/
if (s > pTSBuf->tsData.allocSize) {
expandBuffer(&pTSBuf->tsData, s);
}
pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
pCur->vnodeIndex = vnodeIndex;
pCur->blockIndex = blockIndex;
pCur->tsIndex = (pCur->order == TSQL_SO_ASC) ? 0 : pBlock->numOfElem - 1;
}
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) {
int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
if (j == -1) {
return NULL;
}
return &pTSBuf->pData[j].info;
}
int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode < 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
return -1;
}
int64_t r = fseek(pTSBuf->f, 0, SEEK_SET);
if (r != 0) {
return -1;
}
fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f);
return 0;
}
bool tsBufNextPos(STSBuf* pTSBuf) {
if (pTSBuf == NULL || pTSBuf->numOfVnodes == 0) {
return false;
}
STSCursor* pCur = &pTSBuf->cur;
// get the first/last position according to traverse order
if (pCur->vnodeIndex == -1) {
if (pCur->order == TSQL_SO_ASC) {
tsBufGetBlock(pTSBuf, 0, 0);
if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return
tsBufResetPos(pTSBuf);
return false;
} else {
return true;
}
} else { // get the last timestamp record in the last block of the last vnode
assert(pTSBuf->numOfVnodes > 0);
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1;
pCur->vnodeIndex = vnodeIndex;
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
tsBufGetBlock(pTSBuf, vnodeIndex, blockIndex);
pCur->tsIndex = pTSBuf->block.numOfElem - 1;
if (pTSBuf->block.numOfElem == 0) {
tsBufResetPos(pTSBuf);
return false;
} else {
return true;
}
}
}
int32_t step = pCur->order == TSQL_SO_ASC ? 1 : -1;
while (1) {
assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE);
if ((pCur->order == TSQL_SO_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
(pCur->order == TSQL_SO_DESC && pCur->tsIndex <= 0)) {
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSQL_SO_ASC) ||
(pCur->blockIndex <= 0 && pCur->order == TSQL_SO_DESC)) {
if ((pCur->vnodeIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSQL_SO_ASC) ||
(pCur->vnodeIndex <= 0 && pCur->order == TSQL_SO_DESC)) {
pCur->vnodeIndex = -1;
return false;
}
if (pBlockInfo == NULL) {
return false;
}
int32_t blockIndex = pCur->order == TSQL_SO_ASC ? 0 : pBlockInfo->numOfBlocks - 1;
tsBufGetBlock(pTSBuf, pCur->vnodeIndex + step, blockIndex);
break;
} else {
tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex + step);
break;
}
} else {
pCur->tsIndex += step;
break;
}
}
return true;
}
void tsBufResetPos(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return;
}
pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vnodeIndex = -1, .order = pTSBuf->cur.order};
}
STSElem tsBufGetElem(STSBuf* pTSBuf) {
STSElem elem1 = {.vnode = -1};
STSCursor* pCur = &pTSBuf->cur;
if (pTSBuf == NULL || pCur->vnodeIndex < 0) {
return elem1;
}
STSBlock* pBlock = &pTSBuf->block;
elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
elem1.tag = pBlock->tag;
return elem1;
}
/**
* current only support ts comp data from two vnode merge
* @param pDestBuf
* @param pSrcBuf
* @param vnodeId
* @return
*/
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) {
return 0;
}
if (pDestBuf->numOfVnodes + pSrcBuf->numOfVnodes > TS_COMP_FILE_VNODE_MAX) {
return -1;
}
// src can only have one vnode index
if (pSrcBuf->numOfVnodes > 1) {
return -1;
}
// there are data in buffer, flush to disk first
tsBufFlush(pDestBuf);
// compared with the last vnode id
if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) {
int32_t oldSize = pDestBuf->numOfVnodes;
int32_t newSize = oldSize + pSrcBuf->numOfVnodes;
if (pDestBuf->numOfAlloc < newSize) {
pDestBuf->numOfAlloc = newSize;
STSVnodeBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize);
if (tmp == NULL) {
return -1;
}
pDestBuf->pData = tmp;
}
// directly copy the vnode index information
memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfVnodes * sizeof(STSVnodeBlockInfoEx));
// set the new offset value
for (int32_t i = 0; i < pSrcBuf->numOfVnodes; ++i) {
STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
pBlockInfoEx->info.vnode = vnodeId;
}
pDestBuf->numOfVnodes = newSize;
} else {
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf);
pBlockInfoEx->len += pSrcBuf->pData[0].len;
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
pBlockInfoEx->info.vnode = vnodeId;
}
int32_t r = fseek(pDestBuf->f, 0, SEEK_END);
assert(r == 0);
int64_t offset = getDataStartOffset();
int32_t size = pSrcBuf->fileSize - offset;
#ifdef LINUX
ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size);
#else
ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size);
#endif
if (rc == -1) {
tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
return -1;
}
if (rc != size) {
tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
return -1;
}
pDestBuf->numOfTotal += pSrcBuf->numOfTotal;
int32_t oldSize = pDestBuf->fileSize;
struct stat fileStat;
fstat(fileno(pDestBuf->f), &fileStat);
pDestBuf->fileSize = (uint32_t)fileStat.st_size;
assert(pDestBuf->fileSize == oldSize + size);
tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf,
pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete);
return 0;
}
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) {
STSBuf* pTSBuf = tsBufCreate(true);
STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info);
pBlockInfo->numOfBlocks = numOfBlocks;
pBlockInfo->compLen = len;
pBlockInfo->offset = getDataStartOffset();
pBlockInfo->vnode = 0;
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
fwrite((void*)pData, 1, len, pTSBuf->f);
pTSBuf->fileSize += len;
pTSBuf->tsOrder = order;
assert(order == TSQL_SO_ASC || order == TSQL_SO_DESC);
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
fsync(fileno(pTSBuf->f));
return pTSBuf;
}
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) {
STSElem elem = {.vnode = -1};
if (pTSBuf == NULL) {
return elem;
}
int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
if (j == -1) {
return elem;
}
// for debug purpose
// tsBufDisplay(pTSBuf);
STSCursor* pCur = &pTSBuf->cur;
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
if (blockIndex < 0) {
return elem;
}
pCur->vnodeIndex = j;
pCur->blockIndex = blockIndex;
tsBufGetBlock(pTSBuf, j, blockIndex);
return tsBufGetElem(pTSBuf);
}
STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
STSCursor c = {.vnodeIndex = -1};
if (pTSBuf == NULL) {
return c;
}
return pTSBuf->cur;
}
void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
if (pTSBuf == NULL || pCur == NULL) {
return;
}
// assert(pCur->vnodeIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
if (pCur->vnodeIndex != -1) {
tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex);
}
pTSBuf->cur = *pCur;
}
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
if (pTSBuf == NULL) {
return;
}
pTSBuf->cur.order = order;
}
STSBuf* tsBufClone(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return NULL;
}
return tsBufCreateFromFile(pTSBuf->path, false);
}
void tsBufDisplay(STSBuf* pTSBuf) {
printf("-------start of ts comp file-------\n");
printf("number of vnode:%d\n", pTSBuf->numOfVnodes);
int32_t old = pTSBuf->cur.order;
pTSBuf->cur.order = TSQL_SO_ASC;
tsBufResetPos(pTSBuf);
while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts);
}
pTSBuf->cur.order = old;
printf("-------end of ts comp file-------\n");
}
}
\ No newline at end of file
......@@ -1317,7 +1317,7 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
pSql->fetchFp = pSql->fp;
// replace user defined callback function with multi-insert proxy function
pSql->fp = launchMultivnodeInsert;
pSql->fp = (void(*)())launchMultivnodeInsert;
}
ret = tsParseInsertSql(pSql);
......
......@@ -17,12 +17,12 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qast.h"
#include "taos.h"
#include "taosmsg.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "tast.h"
#include "tscUtil.h"
#include "tschemautil.h"
......
......@@ -29,6 +29,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "ttokendef.h"
#include "qast.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
......
......@@ -16,8 +16,8 @@
#include "tscUtil.h"
#include "hash.h"
#include "os.h"
#include "qast.h"
#include "taosmsg.h"
#include "tast.h"
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
......
......@@ -30,10 +30,10 @@
#include "mgmtVgroup.h"
#include "mnode.h"
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tast.h"
#include "tschemautil.h"
#include "tscompression.h"
#include "tskiplist.h"
......
......@@ -131,13 +131,6 @@ typedef struct tSidSet {
SColumnOrderInfo orderIdx;
} tSidSet;
/**
*
* @param fileNamePattern
* @param dstPath
*/
void getTmpfilePath(const char *fileNamePattern, char *dstPath);
/**
*
* @param inMemSize
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_STSBUF_H
#define TDENGINE_STSBUF_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#define MEM_BUF_SIZE (1 << 20)
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512
typedef struct STSList {
char* rawBuf;
int32_t allocSize;
int32_t threshold;
int32_t len;
} STSList;
typedef struct STSRawBlock {
int32_t vnode;
int64_t tag;
TSKEY* ts;
int32_t len;
} STSRawBlock;
typedef struct STSElem {
TSKEY ts;
int64_t tag;
int32_t vnode;
} STSElem;
typedef struct STSCursor {
int32_t vnodeIndex;
int32_t blockIndex;
int32_t tsIndex;
int32_t order;
} STSCursor;
typedef struct STSBlock {
int64_t tag; // tag value
int32_t numOfElem; // number of elements
int32_t compLen; // size after compressed
int32_t padding; // 0xFFFFFFFF by default, after the payload
char* payload; // actual data that is compressed
} STSBlock;
/*
* The size of buffer file should not be greater than 2G,
* and the offset of int32_t type is enough
*/
typedef struct STSVnodeBlockInfo {
int32_t vnode; // vnode id
int32_t offset; // offset set value in file
int32_t numOfBlocks; // number of total blocks
int32_t compLen; // compressed size
} STSVnodeBlockInfo;
typedef struct STSVnodeBlockInfoEx {
STSVnodeBlockInfo info;
int32_t len; // length before compress
} STSVnodeBlockInfoEx;
typedef struct STSBuf {
FILE* f;
char path[PATH_MAX];
uint32_t fileSize;
STSVnodeBlockInfoEx* pData;
int32_t numOfAlloc;
int32_t numOfVnodes;
char* assistBuf;
int32_t bufSize;
STSBlock block;
STSList tsData; // uncompressed raw ts data
uint64_t numOfTotal;
bool autoDelete;
int32_t tsOrder; // order of timestamp in ts comp buffer
STSCursor cur;
} STSBuf;
typedef struct STSBufFileHeader {
uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file
uint32_t tsOrder; // timestamp order in current file
} STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSBuf* tsBufClone(STSBuf* pTSBuf);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
/**
* display all data in comp block file, for debug purpose only
* @param pTSBuf
*/
void tsBufDisplay(STSBuf* pTSBuf);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_STSBUF_H
......@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qast.h"
#include "os.h"
#include "qsqlparser.h"
#include "qsyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "tlog.h"
#include "tschemautil.h"
#include "tsqlfunction.h"
......@@ -608,6 +608,7 @@ int32_t merge(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset *
// }
//
// return pFinalRes->num;
return 0;
}
int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset *pFinalRes) {
......@@ -642,6 +643,7 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults
// }
//
// return pFinalRes->num;
return 0;
}
/*
......
......@@ -25,29 +25,6 @@
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
int32_t tmpFileSerialNum = 0;
void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[MAX_TMPFILE_PATH_LENGTH] = {0};
#ifdef WINDOWS
char *tmpDir = getenv("tmp");
if (tmpDir == NULL) {
tmpDir = "";
}
#else
char *tmpDir = "/tmp/";
#endif
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%llu-%u");
snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1));
}
/*
* SColumnModel is deeply copy
*/
......
#include "qtsbuf.h"
#include "tscompression.h"
#include "tutil.h"
#include "taoserror.h"
static int32_t getDataStartOffset();
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo);
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf);
static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader);
/**
* todo error handling
* support auto closeable tmp file
* @param path
* @return
*/
STSBuf* tsBufCreate(bool autoDelete) {
STSBuf* pTSBuf = calloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) {
return NULL;
}
getTmpfilePath("join", pTSBuf->path);
pTSBuf->f = fopen(pTSBuf->path, "w+");
if (pTSBuf->f == NULL) {
free(pTSBuf);
return NULL;
}
if (NULL == allocResForTSBuf(pTSBuf)) {
return NULL;
}
// update the header info
STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSQL_SO_ASC};
STSBufUpdateHeader(pTSBuf, &header);
tsBufResetPos(pTSBuf);
pTSBuf->cur.order = TSQL_SO_ASC;
pTSBuf->autoDelete = autoDelete;
pTSBuf->tsOrder = -1;
return pTSBuf;
}
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
STSBuf* pTSBuf = calloc(1, sizeof(STSBuf));
if (pTSBuf == NULL) {
return NULL;
}
strncpy(pTSBuf->path, path, PATH_MAX);
pTSBuf->f = fopen(pTSBuf->path, "r+");
if (pTSBuf->f == NULL) {
free(pTSBuf);
return NULL;
}
if (allocResForTSBuf(pTSBuf) == NULL) {
return NULL;
}
// validate the file magic number
STSBufFileHeader header = {0};
fseek(pTSBuf->f, 0, SEEK_SET);
fread(&header, 1, sizeof(header), pTSBuf->f);
// invalid file
if (header.magic != TS_COMP_FILE_MAGIC) {
return NULL;
}
if (header.numOfVnode > pTSBuf->numOfAlloc) {
pTSBuf->numOfAlloc = header.numOfVnode;
STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc);
if (tmp == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->pData = tmp;
}
pTSBuf->numOfVnodes = header.numOfVnode;
// check the ts order
pTSBuf->tsOrder = header.tsOrder;
if (pTSBuf->tsOrder != TSQL_SO_ASC && pTSBuf->tsOrder != TSQL_SO_DESC) {
// tscError("invalid order info in buf:%d", pTSBuf->tsOrder);
tsBufDestory(pTSBuf);
return NULL;
}
size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes;
STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize);
//int64_t pos = ftell(pTSBuf->f); //pos not used
fread(buf, infoSize, 1, pTSBuf->f);
// the length value for each vnode is not kept in file, so does not set the length value
for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) {
STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i];
memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo));
}
free(buf);
fseek(pTSBuf->f, 0, SEEK_END);
struct stat fileStat;
fstat(fileno(pTSBuf->f), &fileStat);
pTSBuf->fileSize = (uint32_t)fileStat.st_size;
tsBufResetPos(pTSBuf);
// ascending by default
pTSBuf->cur.order = TSQL_SO_ASC;
pTSBuf->autoDelete = autoDelete;
// tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f),
// pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete);
return pTSBuf;
}
void* tsBufDestory(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return NULL;
}
tfree(pTSBuf->assistBuf);
tfree(pTSBuf->tsData.rawBuf);
tfree(pTSBuf->pData);
tfree(pTSBuf->block.payload);
fclose(pTSBuf->f);
if (pTSBuf->autoDelete) {
// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path);
unlink(pTSBuf->path);
} else {
// tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path);
}
free(pTSBuf);
return NULL;
}
static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) {
int32_t last = pTSBuf->numOfVnodes - 1;
assert(last >= 0);
return &pTSBuf->pData[last];
}
static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) {
if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) {
uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5);
assert(newSize > pTSBuf->numOfAlloc);
STSVnodeBlockInfoEx* tmp = (STSVnodeBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize);
if (tmp == NULL) {
return NULL;
}
pTSBuf->pData = tmp;
pTSBuf->numOfAlloc = newSize;
memset(&pTSBuf->pData[pTSBuf->numOfVnodes], 0, sizeof(STSVnodeBlockInfoEx) * (newSize - pTSBuf->numOfVnodes));
}
if (pTSBuf->numOfVnodes > 0) {
STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info);
}
// set initial value for vnode block
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes].info;
pBlockInfo->vnode = vnodeId;
pBlockInfo->offset = pTSBuf->fileSize;
assert(pBlockInfo->offset >= getDataStartOffset());
// update vnode info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes, pBlockInfo);
// add one vnode info
pTSBuf->numOfVnodes += 1;
// update the header info
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
return tsBufGetLastVnodeInfo(pTSBuf);
}
static void shrinkBuffer(STSList* ptsData) {
// shrink tmp buffer size if it consumes too many memory compared to the pre-defined size
if (ptsData->allocSize >= ptsData->threshold * 2) {
ptsData->rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE);
ptsData->allocSize = MEM_BUF_SIZE;
}
}
static void writeDataToDisk(STSBuf* pTSBuf) {
if (pTSBuf->tsData.len == 0) {
return;
}
STSBlock* pBlock = &pTSBuf->block;
pBlock->numOfElem = pTSBuf->tsData.len / TSDB_KEYSIZE;
pBlock->compLen =
tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET);
UNUSED(r);
/*
* format for output data:
* 1. tags, number of ts, size after compressed, payload, size after compressed
* 2. tags, number of ts, size after compressed, payload, size after compressed
*
* both side has the compressed length is used to support load data forwards/backwords.
*/
fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen;
pTSBuf->fileSize += blockSize;
pTSBuf->tsData.len = 0;
STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
pVnodeBlockInfoEx->info.compLen += blockSize;
pVnodeBlockInfoEx->info.numOfBlocks += 1;
shrinkBuffer(&pTSBuf->tsData);
}
static void expandBuffer(STSList* ptsData, int32_t inputSize) {
if (ptsData->allocSize - ptsData->len < inputSize) {
int32_t newSize = inputSize + ptsData->len;
char* tmp = realloc(ptsData->rawBuf, (size_t)newSize);
if (tmp == NULL) {
// todo
}
ptsData->rawBuf = tmp;
ptsData->allocSize = newSize;
}
}
STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) {
STSBlock* pBlock = &pTSBuf->block;
// clear the memory buffer
void* tmp = pBlock->payload;
memset(pBlock, 0, sizeof(STSBlock));
pBlock->payload = tmp;
if (order == TSQL_SO_DESC) {
/*
* set the right position for the reversed traverse, the reversed traverse is started from
* the end of each comp data block
*/
fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR);
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
pBlock->compLen = pBlock->padding;
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
fseek(pTSBuf->f, -offset, SEEK_CUR);
}
fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f);
fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f);
fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f);
fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f);
if (decomp) {
pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
}
// read the comp length at the length of comp block
fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f);
// for backwards traverse, set the start position at the end of previous block
if (order == TSQL_SO_DESC) {
int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
int64_t r = fseek(pTSBuf->f, -offset, SEEK_CUR);
UNUSED(r);
}
return pBlock;
}
// set the order of ts buffer if the ts order has not been set yet
static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) {
STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->tsOrder == -1) {
if (ptsData->len > 0) {
TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE);
if (lastKey > *(TSKEY*)pData) {
pTSBuf->tsOrder = TSQL_SO_DESC;
} else {
pTSBuf->tsOrder = TSQL_SO_ASC;
}
} else if (len > TSDB_KEYSIZE) {
// no data in current vnode, more than one ts is added, check the orders
TSKEY k1 = *(TSKEY*)(pData);
TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE);
if (k1 < k2) {
pTSBuf->tsOrder = TSQL_SO_ASC;
} else if (k1 > k2) {
pTSBuf->tsOrder = TSQL_SO_DESC;
} else {
// todo handle error
}
}
} else {
// todo the timestamp order is set, check the asc/desc order of appended data
}
return TSDB_CODE_SUCCESS;
}
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) {
STSVnodeBlockInfoEx* pBlockInfo = NULL;
STSList* ptsData = &pTSBuf->tsData;
if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) {
writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData);
pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId);
} else {
pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf);
}
assert(pBlockInfo->info.vnode == vnodeId);
if (pTSBuf->block.tag != tag && ptsData->len > 0) {
// new arrived data with different tags value, save current value into disk first
writeDataToDisk(pTSBuf);
} else {
expandBuffer(ptsData, len);
}
pTSBuf->block.tag = tag;
memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len);
// todo check return value
setCheckTSOrder(pTSBuf, pData, len);
ptsData->len += len;
pBlockInfo->len += len;
pTSBuf->numOfTotal += len / TSDB_KEYSIZE;
// the size of raw data exceeds the size of the default prepared buffer, so
// during getBufBlock, the output buffer needs to be large enough.
if (ptsData->len >= ptsData->threshold) {
writeDataToDisk(pTSBuf);
shrinkBuffer(ptsData);
}
tsBufResetPos(pTSBuf);
}
void tsBufFlush(STSBuf* pTSBuf) {
if (pTSBuf->tsData.len <= 0) {
return;
}
writeDataToDisk(pTSBuf);
shrinkBuffer(&pTSBuf->tsData);
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf);
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info);
// save the ts order into header
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
fsync(fileno(pTSBuf->f));
}
static int32_t tsBufFindVnodeIndexFromId(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t numOfVnodes, int32_t vnodeId) {
int32_t j = -1;
for (int32_t i = 0; i < numOfVnodes; ++i) {
if (pVnodeInfoEx[i].info.vnode == vnodeId) {
j = i;
break;
}
}
return j;
}
// todo opt performance by cache blocks info
static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int32_t blockIndex) {
if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) {
return -1;
}
// sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode
int32_t i = 0;
bool decomp = false;
while ((i++) <= blockIndex) {
if (readDataFromDisk(pTSBuf, TSQL_SO_ASC, decomp) == NULL) {
return -1;
}
}
// set the file position to be the end of previous comp block
if (pTSBuf->cur.order == TSQL_SO_DESC) {
STSBlock* pBlock = &pTSBuf->block;
int32_t compBlockSize =
pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag);
fseek(pTSBuf->f, -compBlockSize, SEEK_CUR);
}
return 0;
}
static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) {
bool decomp = false;
int64_t offset = 0;
if (pTSBuf->cur.order == TSQL_SO_ASC) {
offset = pBlockInfo->offset;
} else { // reversed traverse starts from the end of block
offset = pBlockInfo->offset + pBlockInfo->compLen;
}
if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) {
return -1;
}
for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) {
if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) {
return -1;
}
if (pTSBuf->block.tag == tag) {
return i;
}
}
return -1;
}
static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex) {
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[vnodeIndex].info;
if (pBlockInfo->numOfBlocks <= blockIndex) {
assert(false);
}
STSCursor* pCur = &pTSBuf->cur;
if (pCur->vnodeIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSQL_SO_ASC) ||
(pCur->blockIndex >= blockIndex && pCur->order == TSQL_SO_DESC))) {
int32_t i = 0;
bool decomp = false;
int32_t step = abs(blockIndex - pCur->blockIndex);
while ((++i) <= step) {
if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) {
return;
}
}
} else {
if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) {
assert(false);
}
}
STSBlock* pBlock = &pTSBuf->block;
size_t s = pBlock->numOfElem * TSDB_KEYSIZE;
/*
* In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value
* may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function
*/
if (s > pTSBuf->tsData.allocSize) {
expandBuffer(&pTSBuf->tsData, s);
}
pTSBuf->tsData.len =
tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf,
pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize);
assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len));
pCur->vnodeIndex = vnodeIndex;
pCur->blockIndex = blockIndex;
pCur->tsIndex = (pCur->order == TSQL_SO_ASC) ? 0 : pBlock->numOfElem - 1;
}
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) {
int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
if (j == -1) {
return NULL;
}
return &pTSBuf->pData[j].info;
}
int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) {
if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode < 0 || pHeader->magic != TS_COMP_FILE_MAGIC) {
return -1;
}
int64_t r = fseek(pTSBuf->f, 0, SEEK_SET);
if (r != 0) {
return -1;
}
fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f);
return 0;
}
bool tsBufNextPos(STSBuf* pTSBuf) {
if (pTSBuf == NULL || pTSBuf->numOfVnodes == 0) {
return false;
}
STSCursor* pCur = &pTSBuf->cur;
// get the first/last position according to traverse order
if (pCur->vnodeIndex == -1) {
if (pCur->order == TSQL_SO_ASC) {
tsBufGetBlock(pTSBuf, 0, 0);
if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return
tsBufResetPos(pTSBuf);
return false;
} else {
return true;
}
} else { // get the last timestamp record in the last block of the last vnode
assert(pTSBuf->numOfVnodes > 0);
int32_t vnodeIndex = pTSBuf->numOfVnodes - 1;
pCur->vnodeIndex = vnodeIndex;
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
int32_t blockIndex = pBlockInfo->numOfBlocks - 1;
tsBufGetBlock(pTSBuf, vnodeIndex, blockIndex);
pCur->tsIndex = pTSBuf->block.numOfElem - 1;
if (pTSBuf->block.numOfElem == 0) {
tsBufResetPos(pTSBuf);
return false;
} else {
return true;
}
}
}
int32_t step = pCur->order == TSQL_SO_ASC ? 1 : -1;
while (1) {
assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE);
if ((pCur->order == TSQL_SO_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) ||
(pCur->order == TSQL_SO_DESC && pCur->tsIndex <= 0)) {
int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId);
if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSQL_SO_ASC) ||
(pCur->blockIndex <= 0 && pCur->order == TSQL_SO_DESC)) {
if ((pCur->vnodeIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSQL_SO_ASC) ||
(pCur->vnodeIndex <= 0 && pCur->order == TSQL_SO_DESC)) {
pCur->vnodeIndex = -1;
return false;
}
if (pBlockInfo == NULL) {
return false;
}
int32_t blockIndex = pCur->order == TSQL_SO_ASC ? 0 : pBlockInfo->numOfBlocks - 1;
tsBufGetBlock(pTSBuf, pCur->vnodeIndex + step, blockIndex);
break;
} else {
tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex + step);
break;
}
} else {
pCur->tsIndex += step;
break;
}
}
return true;
}
void tsBufResetPos(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return;
}
pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vnodeIndex = -1, .order = pTSBuf->cur.order};
}
STSElem tsBufGetElem(STSBuf* pTSBuf) {
STSElem elem1 = {.vnode = -1};
STSCursor* pCur = &pTSBuf->cur;
if (pTSBuf == NULL || pCur->vnodeIndex < 0) {
return elem1;
}
STSBlock* pBlock = &pTSBuf->block;
elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode;
elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE);
elem1.tag = pBlock->tag;
return elem1;
}
/**
* current only support ts comp data from two vnode merge
* @param pDestBuf
* @param pSrcBuf
* @param vnodeId
* @return
*/
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) {
if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) {
return 0;
}
if (pDestBuf->numOfVnodes + pSrcBuf->numOfVnodes > TS_COMP_FILE_VNODE_MAX) {
return -1;
}
// src can only have one vnode index
if (pSrcBuf->numOfVnodes > 1) {
return -1;
}
// there are data in buffer, flush to disk first
tsBufFlush(pDestBuf);
// compared with the last vnode id
if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) {
int32_t oldSize = pDestBuf->numOfVnodes;
int32_t newSize = oldSize + pSrcBuf->numOfVnodes;
if (pDestBuf->numOfAlloc < newSize) {
pDestBuf->numOfAlloc = newSize;
STSVnodeBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize);
if (tmp == NULL) {
return -1;
}
pDestBuf->pData = tmp;
}
// directly copy the vnode index information
memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfVnodes * sizeof(STSVnodeBlockInfoEx));
// set the new offset value
for (int32_t i = 0; i < pSrcBuf->numOfVnodes; ++i) {
STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize];
pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize;
pBlockInfoEx->info.vnode = vnodeId;
}
pDestBuf->numOfVnodes = newSize;
} else {
STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf);
pBlockInfoEx->len += pSrcBuf->pData[0].len;
pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks;
pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen;
pBlockInfoEx->info.vnode = vnodeId;
}
int32_t r = fseek(pDestBuf->f, 0, SEEK_END);
assert(r == 0);
int64_t offset = getDataStartOffset();
int32_t size = pSrcBuf->fileSize - offset;
#ifdef LINUX
ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size);
#else
ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size);
#endif
if (rc == -1) {
// tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
return -1;
}
if (rc != size) {
// tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno));
return -1;
}
pDestBuf->numOfTotal += pSrcBuf->numOfTotal;
int32_t oldSize = pDestBuf->fileSize;
struct stat fileStat;
fstat(fileno(pDestBuf->f), &fileStat);
pDestBuf->fileSize = (uint32_t)fileStat.st_size;
assert(pDestBuf->fileSize == oldSize + size);
// tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf,
// pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete);
return 0;
}
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) {
STSBuf* pTSBuf = tsBufCreate(true);
STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info);
pBlockInfo->numOfBlocks = numOfBlocks;
pBlockInfo->compLen = len;
pBlockInfo->offset = getDataStartOffset();
pBlockInfo->vnode = 0;
// update prev vnode length info in file
TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo);
fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET);
fwrite((void*)pData, 1, len, pTSBuf->f);
pTSBuf->fileSize += len;
pTSBuf->tsOrder = order;
assert(order == TSQL_SO_ASC || order == TSQL_SO_DESC);
STSBufFileHeader header = {
.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder};
STSBufUpdateHeader(pTSBuf, &header);
fsync(fileno(pTSBuf->f));
return pTSBuf;
}
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) {
STSElem elem = {.vnode = -1};
if (pTSBuf == NULL) {
return elem;
}
int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId);
if (j == -1) {
return elem;
}
// for debug purpose
// tsBufDisplay(pTSBuf);
STSCursor* pCur = &pTSBuf->cur;
STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[j].info;
int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag);
if (blockIndex < 0) {
return elem;
}
pCur->vnodeIndex = j;
pCur->blockIndex = blockIndex;
tsBufGetBlock(pTSBuf, j, blockIndex);
return tsBufGetElem(pTSBuf);
}
STSCursor tsBufGetCursor(STSBuf* pTSBuf) {
STSCursor c = {.vnodeIndex = -1};
if (pTSBuf == NULL) {
return c;
}
return pTSBuf->cur;
}
void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) {
if (pTSBuf == NULL || pCur == NULL) {
return;
}
// assert(pCur->vnodeIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0);
if (pCur->vnodeIndex != -1) {
tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex);
}
pTSBuf->cur = *pCur;
}
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) {
if (pTSBuf == NULL) {
return;
}
pTSBuf->cur.order = order;
}
STSBuf* tsBufClone(STSBuf* pTSBuf) {
if (pTSBuf == NULL) {
return NULL;
}
return tsBufCreateFromFile(pTSBuf->path, false);
}
void tsBufDisplay(STSBuf* pTSBuf) {
printf("-------start of ts comp file-------\n");
printf("number of vnode:%d\n", pTSBuf->numOfVnodes);
int32_t old = pTSBuf->cur.order;
pTSBuf->cur.order = TSQL_SO_ASC;
tsBufResetPos(pTSBuf);
while (tsBufNextPos(pTSBuf)) {
STSElem elem = tsBufGetElem(pTSBuf);
printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts);
}
pTSBuf->cur.order = old;
printf("-------end of ts comp file-------\n");
}
static int32_t getDataStartOffset() {
return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo);
}
static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) {
if (offset < 0 || offset >= getDataStartOffset()) {
return -1;
}
if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) {
return -1;
}
fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f);
return 0;
}
// update prev vnode length info in file
static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) {
int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo);
doUpdateVnodeInfo(pTSBuf, offset, pBlockInfo);
}
static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) {
const int32_t INITIAL_VNODEINFO_SIZE = 4;
pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE;
pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx));
if (pTSBuf->pData == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->tsData.rawBuf == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->bufSize = MEM_BUF_SIZE;
pTSBuf->tsData.threshold = MEM_BUF_SIZE;
pTSBuf->tsData.allocSize = MEM_BUF_SIZE;
pTSBuf->assistBuf = malloc(MEM_BUF_SIZE);
if (pTSBuf->assistBuf == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->block.payload = malloc(MEM_BUF_SIZE);
if (pTSBuf->block.payload == NULL) {
tsBufDestory(pTSBuf);
return NULL;
}
pTSBuf->fileSize += getDataStartOffset();
return pTSBuf;
}
\ No newline at end of file
......@@ -137,6 +137,13 @@ int64_t str2int64(char *str);
int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath);
/**
*
* @param fileNamePattern
* @param dstPath
*/
void getTmpfilePath(const char *fileNamePattern, char *dstPath);
int32_t taosInitTimer(void (*callback)(int), int32_t ms);
bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len);
......
......@@ -520,6 +520,7 @@ SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) {
}
pIter->pHashObj = pHashObj;
return pIter;
}
static SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
......@@ -600,6 +601,7 @@ void *taosHashDestroyIter(SHashMutableIterator *iter) {
}
free(iter);
return NULL;
}
// for profile only
......
......@@ -27,6 +27,8 @@
#include "tlog.h"
#include "taoserror.h"
int32_t tmpFileSerialNum = 0;
int32_t strdequote(char *z) {
if (z == NULL) {
return 0;
......@@ -401,6 +403,27 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
return rename(fullPath, *dstPath);
}
void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX] = {0};
#ifdef WINDOWS
char *tmpDir = getenv("tmp");
if (tmpDir == NULL) {
tmpDir = "";
}
#else
char *tmpDir = "/tmp/";
#endif
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%llu-%u");
snprintf(dstPath, PATH_MAX, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1));
}
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) {
#if defined WINDOWS
for (int i = 0; i < bytes; ++i) {
......
......@@ -16,11 +16,9 @@
#ifndef TBASE_MNODE_SUPER_TABLE_QUERY_H
#define TBASE_MNODE_SUPER_TABLE_QUERY_H
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "os.h"
#include "mnode.h"
#include "tast.h"
#include "qast.h"
int32_t mgmtDoJoin(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes);
void mgmtReorganizeMetersInMetricMeta(SSuperTableMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
......
......@@ -19,9 +19,9 @@
#include "hash.h"
#include "hashfunc.h"
#include "ihash.h"
#include "qast.h"
#include "qextbuffer.h"
#include "taosmsg.h"
#include "tast.h"
#include "tscJoinProcess.h"
#include "tscompression.h"
#include "vnode.h"
......
......@@ -16,11 +16,10 @@
#define _DEFAULT_SOURCE
#include "mnode.h"
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "tast.h"
#include "tschemautil.h"
#include "tsqlfunction.h"
//#include "vnodeTagMgmt.h"
typedef struct SSyntaxTreeFilterSupporter {
SSchema* pTagSchema;
......
......@@ -16,10 +16,10 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "tlog.h"
#include "tutil.h"
#include "vnodeTagMgmt.h"
......
......@@ -16,13 +16,13 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tast.h"
#include "qast.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "vnode.h"
#include "vnodeDataFilterFunc.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
#include "vnodeUtil.h"
int vnodeCheckFileIntegrity(FILE* fp) {
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册