diff --git a/src/client/inc/tscCache.h b/src/client/inc/tscCache.h deleted file mode 100644 index 4c6acec096c01db64b09c4f0d18f404b8825f7b6..0000000000000000000000000000000000000000 --- a/src/client/inc/tscCache.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_TSCCACHE_H -#define TDENGINE_TSCCACHE_H - -#ifdef __cplusplus -extern "C" { -#endif - -void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer); - -void taosCloseConnCache(void *handle); - -void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user); - -void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user); - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_TSCACHE_H diff --git a/src/client/inc/tscJoinProcess.h b/src/client/inc/tscJoinProcess.h index 34764e4db62469af14592a026015c88b53a03fa5..bc1101df276c4d4ffe52143fb79bbd0acaae2ec8 100644 --- a/src/client/inc/tscJoinProcess.h +++ b/src/client/inc/tscJoinProcess.h @@ -24,7 +24,6 @@ extern "C" { #include "tsclient.h" void tscFetchDatablockFromSubquery(SSqlObj* pSql); -void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num); void tscSetupOutputColumnIndex(SSqlObj* pSql); int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql); @@ -33,121 +32,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code); SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index); void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter); -#define MEM_BUF_SIZE (1<<20) -#define TS_COMP_BLOCK_PADDING 0xFFFFFFFF -#define TS_COMP_FILE_MAGIC 0x87F5EC4C -#define TS_COMP_FILE_VNODE_MAX 512 - -typedef struct STSList { - char* rawBuf; - int32_t allocSize; - int32_t threshold; - int32_t len; -} STSList; - -typedef struct STSRawBlock { - int32_t vnode; - int64_t tag; - TSKEY* ts; - int32_t len; -} STSRawBlock; - -typedef struct STSElem { - TSKEY ts; - int64_t tag; - int32_t vnode; -} STSElem; - -typedef struct STSCursor { - int32_t vnodeIndex; - int32_t blockIndex; - int32_t tsIndex; - int32_t order; -} STSCursor; - -typedef struct STSBlock { - int64_t tag; // tag value - int32_t numOfElem; // number of elements - int32_t compLen; // size after compressed - int32_t padding; // 0xFFFFFFFF by default, after the payload - char* payload; // actual data that is compressed -} STSBlock; - -typedef struct STSVnodeBlockInfo { - int32_t vnode; - - /* - * The size of buffer file is not expected to be greater than 2G, - * and the offset of int32_t type is enough - */ - int32_t offset; - int32_t numOfBlocks; - int32_t compLen; -} STSVnodeBlockInfo; - -typedef struct STSVnodeBlockInfoEx { - STSVnodeBlockInfo info; - int32_t len; // length before compress -} STSVnodeBlockInfoEx; - -typedef struct STSBuf { - FILE* f; - char path[PATH_MAX]; - uint32_t fileSize; - - STSVnodeBlockInfoEx* pData; - int32_t numOfAlloc; - int32_t numOfVnodes; - - char* assistBuf; - int32_t bufSize; - STSBlock block; - STSList tsData; // uncompressed raw ts data - - uint64_t numOfTotal; - bool autoDelete; - int32_t tsOrder; // order of timestamp in ts comp buffer - - STSCursor cur; -} STSBuf; - -typedef struct STSBufFileHeader { - uint32_t magic; // file magic number - uint32_t numOfVnode; // number of vnode stored in current file - uint32_t tsOrder; // timestamp order in current file -} STSBufFileHeader; - -STSBuf* tsBufCreate(bool autoDelete); -STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); -STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); - -void* tsBufDestory(STSBuf* pTSBuf); - -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); -int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); - -STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId); - -void tsBufFlush(STSBuf* pTSBuf); - -void tsBufResetPos(STSBuf* pTSBuf); -STSElem tsBufGetElem(STSBuf* pTSBuf); -bool tsBufNextPos(STSBuf* pTSBuf); - -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag); - -STSCursor tsBufGetCursor(STSBuf* pTSBuf); -void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); - -void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur); -STSBuf* tsBufClone(STSBuf* pTSBuf); - -/** - * display all data in comp block file, for debug purpose only - * @param pTSBuf - */ -void tsBufDisplay(STSBuf* pTSBuf); - #ifdef __cplusplus } #endif diff --git a/src/client/inc/tscSQLParser.h b/src/client/inc/tscSQLParser.h deleted file mode 100644 index c7f8ba06e8fc20ea4be57784861ad86b4089aad2..0000000000000000000000000000000000000000 --- a/src/client/inc/tscSQLParser.h +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_TSQL_H -#define TDENGINE_TSQL_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "taos.h" -#include "taosmsg.h" -#include "ttokendef.h" -#include "taosdef.h" -#include "tvariant.h" -#include "qsqlparser.h" - -enum { - TSQL_NODE_TYPE_EXPR = 0x1, - TSQL_NODE_TYPE_ID = 0x2, - TSQL_NODE_TYPE_VALUE = 0x4, -}; - -#define NON_ARITHMEIC_EXPR 0 -#define NORMAL_ARITHMETIC 1 -#define AGG_ARIGHTMEIC 2 - -int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql); - -#ifdef __cplusplus -} -#endif - -#endif diff --git a/src/client/inc/tscSecondaryMerge.h b/src/client/inc/tscSecondaryMerge.h index 08d995c9f3d789a82f5b8fa1331d8653a017181b..5370d0ec5259c3e47e065d4fe8b37884bdf6a050 100644 --- a/src/client/inc/tscSecondaryMerge.h +++ b/src/client/inc/tscSecondaryMerge.h @@ -20,9 +20,9 @@ extern "C" { #endif +#include "qextbuffer.h" +#include "qinterpolation.h" #include "taosmsg.h" -#include "textbuffer.h" -#include "tinterpolation.h" #include "tlosertree.h" #include "tsclient.h" diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index 01537d716b5e22e51a5b6b0746a81a1cb4ec2d60..eaefbdd85f7451a4387a067ec826e6e0f838d503 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -24,10 +24,10 @@ extern "C" { * @date 2018/09/30 */ #include "os.h" -#include "textbuffer.h" +#include "qextbuffer.h" +#include "taosdef.h" #include "tscSecondaryMerge.h" #include "tsclient.h" -#include "taosdef.h" #define UTIL_METER_IS_SUPERTABLE(metaInfo) \ (((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_SUPER_TABLE)) @@ -252,6 +252,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows); void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()); +int32_t launchMultivnodeInsert(SSqlObj *pSql); #ifdef __cplusplus } diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index caec0fdbb8c444e865610ac1e08b992decfc31b3..329c5743708b150eb1a733d4592c1cc6bca319de 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -25,13 +25,13 @@ extern "C" { #include "taosmsg.h" #include "tglobalcfg.h" #include "tlog.h" -#include "tscCache.h" -#include "tscSQLParser.h" #include "taosdef.h" #include "tsqlfunction.h" #include "tutil.h" #include "trpc.h" #include "qsqltype.h" +#include "qsqlparser.h" +#include "qtsbuf.h" #define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows) @@ -308,14 +308,14 @@ typedef struct _tsc_obj { char sversion[TSDB_VERSION_LEN]; char writeAuth : 1; char superAuth : 1; - struct _sql_obj *pSql; - struct _sql_obj *pHb; - struct _sql_obj *sqlList; + struct SSqlObj *pSql; + struct SSqlObj *pHb; + struct SSqlObj *sqlList; struct _sstream *streamList; pthread_mutex_t mutex; } STscObj; -typedef struct _sql_obj { +typedef struct SSqlObj { void * signature; STscObj *pTscObj; void (*fp)(); @@ -340,8 +340,8 @@ typedef struct _sql_obj { uint8_t numOfSubs; char * asyncTblPos; void * pTableHashList; - struct _sql_obj **pSubs; - struct _sql_obj * prev, *next; + struct SSqlObj **pSubs; + struct SSqlObj * prev, *next; } SSqlObj; typedef struct _sstream { @@ -442,10 +442,8 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd); int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql); -// transfer SSqlInfo to SqlCmd struct -int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo); - void tscQueueAsyncFreeResult(SSqlObj *pSql); +int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo); extern void * pVnodeConn; extern void * pTscMgmtConn; @@ -453,7 +451,6 @@ extern void * tscCacheHandle; extern int32_t globalCode; extern int slaveIndex; extern void * tscTmr; -extern void * tscConnCache; extern void * tscQhandle; extern int tscKeepConn[]; extern int tsInsertHeadSize; diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c index f1630ef294cd33fb80d096892120fe9f162203bc..77fe94210035b47c1e0eccc8ca54409a9e66920c 100644 --- a/src/client/src/tscAsync.c +++ b/src/client/src/tscAsync.c @@ -22,7 +22,6 @@ #include "tscUtil.h" #include "tsclient.h" #include "tsocket.h" -#include "tscSQLParser.h" #include "tutil.h" #include "tnote.h" #include "tsched.h" diff --git a/src/client/src/tscCache.c b/src/client/src/tscCache.c deleted file mode 100644 index 666d069a58c936e9028b46f9e6244923ac4be993..0000000000000000000000000000000000000000 --- a/src/client/src/tscCache.c +++ /dev/null @@ -1,264 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" - -#include "tglobalcfg.h" -#include "tlog.h" -#include "tmempool.h" -#include "tsclient.h" -#include "ttime.h" -#include "ttimer.h" -#include "tutil.h" - -typedef struct _c_hash_t { - uint32_t ip; - uint16_t port; - struct _c_hash_t *prev; - struct _c_hash_t *next; - void * data; - uint64_t time; -} SConnHash; - -typedef struct { - SConnHash ** connHashList; - mpool_h connHashMemPool; - int maxSessions; - int total; - int * count; - int64_t keepTimer; - pthread_mutex_t mutex; - void (*cleanFp)(void *); - void *tmrCtrl; - void *pTimer; -} SConnCache; - -int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) { - SConnCache *pObj = (SConnCache *)handle; - int hash = 0; - // size_t user_len = strlen(user); - - hash = ip >> 16; - hash += (unsigned short)(ip & 0xFFFF); - hash += port; - while (*user != '\0') { - hash += *user; - user++; - } - - hash = hash % pObj->maxSessions; - - return hash; -} - -void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) { - if (pNode == NULL) return; - if (time < pObj->keepTimer + pNode->time) return; - - SConnHash *pPrev = pNode->prev, *pNext; - - while (pNode) { - (*pObj->cleanFp)(pNode->data); - pNext = pNode->next; - pObj->total--; - pObj->count[hash]--; - tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode, - pObj->count[hash]); - taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); - pNode = pNext; - } - - if (pPrev) - pPrev->next = NULL; - else - pObj->connHashList[hash] = NULL; -} - -void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) { - int hash; - SConnHash * pNode; - SConnCache *pObj; - - uint64_t time = taosGetTimestampMs(); - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; - - if (data == NULL) { - tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port); - return NULL; - } - - hash = taosHashConn(pObj, ip, port, user); - pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool); - pNode->ip = ip; - pNode->port = port; - pNode->data = data; - pNode->prev = NULL; - pNode->time = time; - - pthread_mutex_lock(&pObj->mutex); - - pNode->next = pObj->connHashList[hash]; - if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode; - pObj->connHashList[hash] = pNode; - - pObj->total++; - pObj->count[hash]++; - taosRemoveExpiredNodes(pObj, pNode->next, hash, time); - - pthread_mutex_unlock(&pObj->mutex); - - tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]); - - return pObj; -} - -void taosCleanConnCache(void *handle, void *tmrId) { - int hash; - SConnHash * pNode; - SConnCache *pObj; - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; - if (pObj->pTimer != tmrId) return; - - uint64_t time = taosGetTimestampMs(); - - for (hash = 0; hash < pObj->maxSessions; ++hash) { - pthread_mutex_lock(&pObj->mutex); - pNode = pObj->connHashList[hash]; - taosRemoveExpiredNodes(pObj, pNode, hash, time); - pthread_mutex_unlock(&pObj->mutex); - } - - // tscTrace("timer, total connections in cache:%d", pObj->total); - taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); -} - -void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) { - int hash; - SConnHash * pNode; - SConnCache *pObj; - void * pData = NULL; - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; - - uint64_t time = taosGetTimestampMs(); - - hash = taosHashConn(pObj, ip, port, user); - pthread_mutex_lock(&pObj->mutex); - - pNode = pObj->connHashList[hash]; - while (pNode) { - if (time >= pObj->keepTimer + pNode->time) { - taosRemoveExpiredNodes(pObj, pNode, hash, time); - pNode = NULL; - break; - } - - if (pNode->ip == ip && pNode->port == port) break; - - pNode = pNode->next; - } - - if (pNode) { - taosRemoveExpiredNodes(pObj, pNode->next, hash, time); - - if (pNode->prev) { - pNode->prev->next = pNode->next; - } else { - pObj->connHashList[hash] = pNode->next; - } - - if (pNode->next) { - pNode->next->prev = pNode->prev; - } - - pData = pNode->data; - taosMemPoolFree(pObj->connHashMemPool, (char *)pNode); - pObj->total--; - pObj->count[hash]--; - } - - pthread_mutex_unlock(&pObj->mutex); - - if (pData) { - tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]); - } - - return pData; -} - -void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) { - SConnHash **connHashList; - mpool_h connHashMemPool; - SConnCache *pObj; - - connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash)); - if (connHashMemPool == 0) return NULL; - - connHashList = calloc(sizeof(SConnHash *), maxSessions); - if (connHashList == 0) { - taosMemPoolCleanUp(connHashMemPool); - return NULL; - } - - pObj = malloc(sizeof(SConnCache)); - if (pObj == NULL) { - taosMemPoolCleanUp(connHashMemPool); - free(connHashList); - return NULL; - } - memset(pObj, 0, sizeof(SConnCache)); - - pObj->count = calloc(sizeof(int), maxSessions); - pObj->total = 0; - pObj->keepTimer = keepTimer; - pObj->maxSessions = maxSessions; - pObj->connHashMemPool = connHashMemPool; - pObj->connHashList = connHashList; - pObj->cleanFp = cleanFp; - pObj->tmrCtrl = tmrCtrl; - taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer); - - pthread_mutex_init(&pObj->mutex, NULL); - - return pObj; -} - -void taosCloseConnCache(void *handle) { - SConnCache *pObj; - - pObj = (SConnCache *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return; - - pthread_mutex_lock(&pObj->mutex); - - taosTmrStopA(&(pObj->pTimer)); - - if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool); - - tfree(pObj->connHashList); - tfree(pObj->count) - - pthread_mutex_unlock(&pObj->mutex); - - pthread_mutex_destroy(&pObj->mutex); - - memset(pObj, 0, sizeof(SConnCache)); - free(pObj); -} diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index 9c4f7e9c55b315263edaa4bbcadef11b6089593f..885b78010d388a70513f5e9234b67fdf82d36c14 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -14,20 +14,21 @@ */ #include "os.h" +#include "qast.h" +#include "qextbuffer.h" +#include "qhistogram.h" +#include "qinterpolation.h" +#include "qpercentile.h" +#include "qsyntaxtreefunction.h" +#include "qtsbuf.h" +#include "taosdef.h" #include "taosmsg.h" -#include "tast.h" -#include "textbuffer.h" -#include "thistogram.h" -#include "tinterpolation.h" #include "tlog.h" #include "tscJoinProcess.h" -#include "tscSyntaxtreefunction.h" #include "tscompression.h" #include "tsqlfunction.h" #include "ttime.h" -#include "taosdef.h" #include "tutil.h" -#include "tpercentile.h" #define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes)) #define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes) diff --git a/src/client/src/tscJoinProcess.c b/src/client/src/tscJoinProcess.c index afd8e98edac4522089e0aaafa02237da67bdd1cc..882c72ae8019c59ff2764f43a201c4c23086f2b1 100644 --- a/src/client/src/tscJoinProcess.c +++ b/src/client/src/tscJoinProcess.c @@ -15,12 +15,8 @@ #include "os.h" #include "tscJoinProcess.h" -#include "tcache.h" -#include "tscUtil.h" #include "tsclient.h" -#include "tscompression.h" -#include "ttime.h" -#include "tutil.h" +#include "qtsbuf.h" static void freeSubqueryObj(SSqlObj* pSql); @@ -813,919 +809,4 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) { } } } -} - -static int32_t getDataStartOffset() { - return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo); -} - -static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { - if (offset < 0 || offset >= getDataStartOffset()) { - return -1; - } - - if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) { - return -1; - } - - fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); - return 0; -} - -// update prev vnode length info in file -static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) { - int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo); - doUpdateVnodeInfo(pTSBuf, offset, pBlockInfo); -} - -static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { - const int32_t INITIAL_VNODEINFO_SIZE = 4; - - pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE; - pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx)); - if (pTSBuf->pData == NULL) { - tsBufDestory(pTSBuf); - return NULL; - } - - pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE); - if (pTSBuf->tsData.rawBuf == NULL) { - tsBufDestory(pTSBuf); - return NULL; - } - - pTSBuf->bufSize = MEM_BUF_SIZE; - pTSBuf->tsData.threshold = MEM_BUF_SIZE; - pTSBuf->tsData.allocSize = MEM_BUF_SIZE; - - pTSBuf->assistBuf = malloc(MEM_BUF_SIZE); - if (pTSBuf->assistBuf == NULL) { - tsBufDestory(pTSBuf); - return NULL; - } - - pTSBuf->block.payload = malloc(MEM_BUF_SIZE); - if (pTSBuf->block.payload == NULL) { - tsBufDestory(pTSBuf); - return NULL; - } - - pTSBuf->fileSize += getDataStartOffset(); - return pTSBuf; -} - -static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); - -/** - * todo error handling - * support auto closeable tmp file - * @param path - * @return - */ -STSBuf* tsBufCreate(bool autoDelete) { - STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); - if (pTSBuf == NULL) { - return NULL; - } - - getTmpfilePath("join", pTSBuf->path); - pTSBuf->f = fopen(pTSBuf->path, "w+"); - if (pTSBuf->f == NULL) { - free(pTSBuf); - return NULL; - } - - if (NULL == allocResForTSBuf(pTSBuf)) { - return NULL; - } - - // update the header info - STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSQL_SO_ASC}; - STSBufUpdateHeader(pTSBuf, &header); - - tsBufResetPos(pTSBuf); - pTSBuf->cur.order = TSQL_SO_ASC; - - pTSBuf->autoDelete = autoDelete; - pTSBuf->tsOrder = -1; - - return pTSBuf; -} - -STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { - STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); - if (pTSBuf == NULL) { - return NULL; - } - - strncpy(pTSBuf->path, path, PATH_MAX); - - pTSBuf->f = fopen(pTSBuf->path, "r+"); - if (pTSBuf->f == NULL) { - free(pTSBuf); - return NULL; - } - - if (allocResForTSBuf(pTSBuf) == NULL) { - return NULL; - } - - // validate the file magic number - STSBufFileHeader header = {0}; - fseek(pTSBuf->f, 0, SEEK_SET); - fread(&header, 1, sizeof(header), pTSBuf->f); - - // invalid file - if (header.magic != TS_COMP_FILE_MAGIC) { - return NULL; - } - - if (header.numOfVnode > pTSBuf->numOfAlloc) { - pTSBuf->numOfAlloc = header.numOfVnode; - STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc); - if (tmp == NULL) { - tsBufDestory(pTSBuf); - return NULL; - } - - pTSBuf->pData = tmp; - } - - pTSBuf->numOfVnodes = header.numOfVnode; - - // check the ts order - pTSBuf->tsOrder = header.tsOrder; - if (pTSBuf->tsOrder != TSQL_SO_ASC && pTSBuf->tsOrder != TSQL_SO_DESC) { - tscError("invalid order info in buf:%d", pTSBuf->tsOrder); - tsBufDestory(pTSBuf); - return NULL; - } - - size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes; - - STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); - - //int64_t pos = ftell(pTSBuf->f); //pos not used - fread(buf, infoSize, 1, pTSBuf->f); - - // the length value for each vnode is not kept in file, so does not set the length value - for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) { - STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i]; - memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo)); - } - - free(buf); - - fseek(pTSBuf->f, 0, SEEK_END); - - struct stat fileStat; - fstat(fileno(pTSBuf->f), &fileStat); - - pTSBuf->fileSize = (uint32_t)fileStat.st_size; - tsBufResetPos(pTSBuf); - - // ascending by default - pTSBuf->cur.order = TSQL_SO_ASC; - - pTSBuf->autoDelete = autoDelete; - - tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), - pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); - - return pTSBuf; -} - -void* tsBufDestory(STSBuf* pTSBuf) { - if (pTSBuf == NULL) { - return NULL; - } - - tfree(pTSBuf->assistBuf); - tfree(pTSBuf->tsData.rawBuf); - - tfree(pTSBuf->pData); - tfree(pTSBuf->block.payload); - - fclose(pTSBuf->f); - - if (pTSBuf->autoDelete) { - tscTrace("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); - unlink(pTSBuf->path); - } else { - tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); - } - - free(pTSBuf); - return NULL; -} - -static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { - int32_t last = pTSBuf->numOfVnodes - 1; - - assert(last >= 0); - return &pTSBuf->pData[last]; -} - -static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { - if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) { - uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); - assert(newSize > pTSBuf->numOfAlloc); - - STSVnodeBlockInfoEx* tmp = (STSVnodeBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); - if (tmp == NULL) { - return NULL; - } - - pTSBuf->pData = tmp; - pTSBuf->numOfAlloc = newSize; - memset(&pTSBuf->pData[pTSBuf->numOfVnodes], 0, sizeof(STSVnodeBlockInfoEx) * (newSize - pTSBuf->numOfVnodes)); - } - - if (pTSBuf->numOfVnodes > 0) { - STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); - - // update prev vnode length info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info); - } - - // set initial value for vnode block - STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes].info; - pBlockInfo->vnode = vnodeId; - pBlockInfo->offset = pTSBuf->fileSize; - assert(pBlockInfo->offset >= getDataStartOffset()); - - // update vnode info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes, pBlockInfo); - - // add one vnode info - pTSBuf->numOfVnodes += 1; - - // update the header info - STSBufFileHeader header = { - .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; - - STSBufUpdateHeader(pTSBuf, &header); - return tsBufGetLastVnodeInfo(pTSBuf); -} - -static void shrinkBuffer(STSList* ptsData) { - // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size - if (ptsData->allocSize >= ptsData->threshold * 2) { - ptsData->rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE); - ptsData->allocSize = MEM_BUF_SIZE; - } -} - -static void writeDataToDisk(STSBuf* pTSBuf) { - if (pTSBuf->tsData.len == 0) { - return; - } - - STSBlock* pBlock = &pTSBuf->block; - - pBlock->numOfElem = pTSBuf->tsData.len / TSDB_KEYSIZE; - pBlock->compLen = - tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload, - pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - - int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); - UNUSED(r); - - /* - * format for output data: - * 1. tags, number of ts, size after compressed, payload, size after compressed - * 2. tags, number of ts, size after compressed, payload, size after compressed - * - * both side has the compressed length is used to support load data forwards/backwords. - */ - fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); - fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); - - fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); - - fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); - - fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); - - int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; - pTSBuf->fileSize += blockSize; - - pTSBuf->tsData.len = 0; - - STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); - - pVnodeBlockInfoEx->info.compLen += blockSize; - pVnodeBlockInfoEx->info.numOfBlocks += 1; - - shrinkBuffer(&pTSBuf->tsData); -} - -static void expandBuffer(STSList* ptsData, int32_t inputSize) { - if (ptsData->allocSize - ptsData->len < inputSize) { - int32_t newSize = inputSize + ptsData->len; - char* tmp = realloc(ptsData->rawBuf, (size_t)newSize); - if (tmp == NULL) { - // todo - } - - ptsData->rawBuf = tmp; - ptsData->allocSize = newSize; - } -} - -STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { - STSBlock* pBlock = &pTSBuf->block; - - // clear the memory buffer - void* tmp = pBlock->payload; - memset(pBlock, 0, sizeof(STSBlock)); - pBlock->payload = tmp; - - if (order == TSQL_SO_DESC) { - /* - * set the right position for the reversed traverse, the reversed traverse is started from - * the end of each comp data block - */ - fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR); - fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); - - pBlock->compLen = pBlock->padding; - int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); - fseek(pTSBuf->f, -offset, SEEK_CUR); - } - - fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); - fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); - - fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); - fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); - - if (decomp) { - pTSBuf->tsData.len = - tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, - pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - } - - // read the comp length at the length of comp block - fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); - - // for backwards traverse, set the start position at the end of previous block - if (order == TSQL_SO_DESC) { - int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); - int64_t r = fseek(pTSBuf->f, -offset, SEEK_CUR); - UNUSED(r); - } - - return pBlock; -} - -// set the order of ts buffer if the ts order has not been set yet -static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { - STSList* ptsData = &pTSBuf->tsData; - - if (pTSBuf->tsOrder == -1) { - if (ptsData->len > 0) { - TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE); - - if (lastKey > *(TSKEY*)pData) { - pTSBuf->tsOrder = TSQL_SO_DESC; - } else { - pTSBuf->tsOrder = TSQL_SO_ASC; - } - } else if (len > TSDB_KEYSIZE) { - // no data in current vnode, more than one ts is added, check the orders - TSKEY k1 = *(TSKEY*)(pData); - TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE); - - if (k1 < k2) { - pTSBuf->tsOrder = TSQL_SO_ASC; - } else if (k1 > k2) { - pTSBuf->tsOrder = TSQL_SO_DESC; - } else { - // todo handle error - } - } - } else { - // todo the timestamp order is set, check the asc/desc order of appended data - } - - return TSDB_CODE_SUCCESS; -} - -void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) { - STSVnodeBlockInfoEx* pBlockInfo = NULL; - STSList* ptsData = &pTSBuf->tsData; - - if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { - writeDataToDisk(pTSBuf); - shrinkBuffer(ptsData); - - pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId); - } else { - pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf); - } - - assert(pBlockInfo->info.vnode == vnodeId); - - if (pTSBuf->block.tag != tag && ptsData->len > 0) { - // new arrived data with different tags value, save current value into disk first - writeDataToDisk(pTSBuf); - } else { - expandBuffer(ptsData, len); - } - - pTSBuf->block.tag = tag; - memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); - - // todo check return value - setCheckTSOrder(pTSBuf, pData, len); - - ptsData->len += len; - pBlockInfo->len += len; - - pTSBuf->numOfTotal += len / TSDB_KEYSIZE; - - // the size of raw data exceeds the size of the default prepared buffer, so - // during getBufBlock, the output buffer needs to be large enough. - if (ptsData->len >= ptsData->threshold) { - writeDataToDisk(pTSBuf); - shrinkBuffer(ptsData); - } - - tsBufResetPos(pTSBuf); -} - -void tsBufFlush(STSBuf* pTSBuf) { - if (pTSBuf->tsData.len <= 0) { - return; - } - - writeDataToDisk(pTSBuf); - shrinkBuffer(&pTSBuf->tsData); - - STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); - - // update prev vnode length info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info); - - // save the ts order into header - STSBufFileHeader header = { - .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; - STSBufUpdateHeader(pTSBuf, &header); - - fsync(fileno(pTSBuf->f)); -} - -static int32_t tsBufFindVnodeIndexFromId(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t numOfVnodes, int32_t vnodeId) { - int32_t j = -1; - for (int32_t i = 0; i < numOfVnodes; ++i) { - if (pVnodeInfoEx[i].info.vnode == vnodeId) { - j = i; - break; - } - } - - return j; -} - -// todo opt performance by cache blocks info -static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int32_t blockIndex) { - if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) { - return -1; - } - - // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode - int32_t i = 0; - bool decomp = false; - - while ((i++) <= blockIndex) { - if (readDataFromDisk(pTSBuf, TSQL_SO_ASC, decomp) == NULL) { - return -1; - } - } - - // set the file position to be the end of previous comp block - if (pTSBuf->cur.order == TSQL_SO_DESC) { - STSBlock* pBlock = &pTSBuf->block; - int32_t compBlockSize = - pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); - fseek(pTSBuf->f, -compBlockSize, SEEK_CUR); - } - - return 0; -} - -static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) { - bool decomp = false; - - int64_t offset = 0; - if (pTSBuf->cur.order == TSQL_SO_ASC) { - offset = pBlockInfo->offset; - } else { // reversed traverse starts from the end of block - offset = pBlockInfo->offset + pBlockInfo->compLen; - } - - if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) { - return -1; - } - - for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) { - if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) { - return -1; - } - - if (pTSBuf->block.tag == tag) { - return i; - } - } - - return -1; -} - -static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex) { - STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[vnodeIndex].info; - if (pBlockInfo->numOfBlocks <= blockIndex) { - assert(false); - } - - STSCursor* pCur = &pTSBuf->cur; - if (pCur->vnodeIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSQL_SO_ASC) || - (pCur->blockIndex >= blockIndex && pCur->order == TSQL_SO_DESC))) { - int32_t i = 0; - bool decomp = false; - int32_t step = abs(blockIndex - pCur->blockIndex); - - while ((++i) <= step) { - if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) { - return; - } - } - } else { - if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) { - assert(false); - } - } - - STSBlock* pBlock = &pTSBuf->block; - - size_t s = pBlock->numOfElem * TSDB_KEYSIZE; - - /* - * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value - * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function - */ - if (s > pTSBuf->tsData.allocSize) { - expandBuffer(&pTSBuf->tsData, s); - } - - pTSBuf->tsData.len = - tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, - pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); - - assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); - - pCur->vnodeIndex = vnodeIndex; - pCur->blockIndex = blockIndex; - - pCur->tsIndex = (pCur->order == TSQL_SO_ASC) ? 0 : pBlock->numOfElem - 1; -} - -STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) { - int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); - if (j == -1) { - return NULL; - } - - return &pTSBuf->pData[j].info; -} - -int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { - if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode < 0 || pHeader->magic != TS_COMP_FILE_MAGIC) { - return -1; - } - - int64_t r = fseek(pTSBuf->f, 0, SEEK_SET); - if (r != 0) { - return -1; - } - - fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f); - return 0; -} - -bool tsBufNextPos(STSBuf* pTSBuf) { - if (pTSBuf == NULL || pTSBuf->numOfVnodes == 0) { - return false; - } - - STSCursor* pCur = &pTSBuf->cur; - - // get the first/last position according to traverse order - if (pCur->vnodeIndex == -1) { - if (pCur->order == TSQL_SO_ASC) { - tsBufGetBlock(pTSBuf, 0, 0); - - if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return - tsBufResetPos(pTSBuf); - return false; - } else { - return true; - } - - } else { // get the last timestamp record in the last block of the last vnode - assert(pTSBuf->numOfVnodes > 0); - - int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; - pCur->vnodeIndex = vnodeIndex; - - int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; - STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); - int32_t blockIndex = pBlockInfo->numOfBlocks - 1; - - tsBufGetBlock(pTSBuf, vnodeIndex, blockIndex); - - pCur->tsIndex = pTSBuf->block.numOfElem - 1; - if (pTSBuf->block.numOfElem == 0) { - tsBufResetPos(pTSBuf); - return false; - } else { - return true; - } - } - } - - int32_t step = pCur->order == TSQL_SO_ASC ? 1 : -1; - - while (1) { - assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE); - - if ((pCur->order == TSQL_SO_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || - (pCur->order == TSQL_SO_DESC && pCur->tsIndex <= 0)) { - int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; - - STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); - if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSQL_SO_ASC) || - (pCur->blockIndex <= 0 && pCur->order == TSQL_SO_DESC)) { - if ((pCur->vnodeIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSQL_SO_ASC) || - (pCur->vnodeIndex <= 0 && pCur->order == TSQL_SO_DESC)) { - pCur->vnodeIndex = -1; - return false; - } - - if (pBlockInfo == NULL) { - return false; - } - - int32_t blockIndex = pCur->order == TSQL_SO_ASC ? 0 : pBlockInfo->numOfBlocks - 1; - tsBufGetBlock(pTSBuf, pCur->vnodeIndex + step, blockIndex); - break; - - } else { - tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex + step); - break; - } - } else { - pCur->tsIndex += step; - break; - } - } - - return true; -} - -void tsBufResetPos(STSBuf* pTSBuf) { - if (pTSBuf == NULL) { - return; - } - - pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vnodeIndex = -1, .order = pTSBuf->cur.order}; -} - -STSElem tsBufGetElem(STSBuf* pTSBuf) { - STSElem elem1 = {.vnode = -1}; - STSCursor* pCur = &pTSBuf->cur; - - if (pTSBuf == NULL || pCur->vnodeIndex < 0) { - return elem1; - } - - STSBlock* pBlock = &pTSBuf->block; - - elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode; - elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); - elem1.tag = pBlock->tag; - - return elem1; -} - -/** - * current only support ts comp data from two vnode merge - * @param pDestBuf - * @param pSrcBuf - * @param vnodeId - * @return - */ -int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { - if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) { - return 0; - } - - if (pDestBuf->numOfVnodes + pSrcBuf->numOfVnodes > TS_COMP_FILE_VNODE_MAX) { - return -1; - } - - // src can only have one vnode index - if (pSrcBuf->numOfVnodes > 1) { - return -1; - } - - // there are data in buffer, flush to disk first - tsBufFlush(pDestBuf); - - // compared with the last vnode id - if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { - int32_t oldSize = pDestBuf->numOfVnodes; - int32_t newSize = oldSize + pSrcBuf->numOfVnodes; - - if (pDestBuf->numOfAlloc < newSize) { - pDestBuf->numOfAlloc = newSize; - - STSVnodeBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); - if (tmp == NULL) { - return -1; - } - - pDestBuf->pData = tmp; - } - - // directly copy the vnode index information - memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfVnodes * sizeof(STSVnodeBlockInfoEx)); - - // set the new offset value - for (int32_t i = 0; i < pSrcBuf->numOfVnodes; ++i) { - STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize]; - pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize; - pBlockInfoEx->info.vnode = vnodeId; - } - - pDestBuf->numOfVnodes = newSize; - } else { - STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); - - pBlockInfoEx->len += pSrcBuf->pData[0].len; - pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; - pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; - pBlockInfoEx->info.vnode = vnodeId; - } - - int32_t r = fseek(pDestBuf->f, 0, SEEK_END); - assert(r == 0); - - int64_t offset = getDataStartOffset(); - int32_t size = pSrcBuf->fileSize - offset; - -#ifdef LINUX - ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size); -#else - ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size); -#endif - - if (rc == -1) { - tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); - return -1; - } - - if (rc != size) { - tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); - return -1; - } - - pDestBuf->numOfTotal += pSrcBuf->numOfTotal; - - int32_t oldSize = pDestBuf->fileSize; - - struct stat fileStat; - fstat(fileno(pDestBuf->f), &fileStat); - pDestBuf->fileSize = (uint32_t)fileStat.st_size; - - assert(pDestBuf->fileSize == oldSize + size); - - tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, - pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); - - return 0; -} - -STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) { - STSBuf* pTSBuf = tsBufCreate(true); - - STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info); - pBlockInfo->numOfBlocks = numOfBlocks; - pBlockInfo->compLen = len; - pBlockInfo->offset = getDataStartOffset(); - pBlockInfo->vnode = 0; - - // update prev vnode length info in file - TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); - - fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); - fwrite((void*)pData, 1, len, pTSBuf->f); - pTSBuf->fileSize += len; - - pTSBuf->tsOrder = order; - assert(order == TSQL_SO_ASC || order == TSQL_SO_DESC); - - STSBufFileHeader header = { - .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; - STSBufUpdateHeader(pTSBuf, &header); - - fsync(fileno(pTSBuf->f)); - - return pTSBuf; -} - -STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { - STSElem elem = {.vnode = -1}; - - if (pTSBuf == NULL) { - return elem; - } - - int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); - if (j == -1) { - return elem; - } - - // for debug purpose - // tsBufDisplay(pTSBuf); - - STSCursor* pCur = &pTSBuf->cur; - STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[j].info; - - int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag); - if (blockIndex < 0) { - return elem; - } - - pCur->vnodeIndex = j; - pCur->blockIndex = blockIndex; - tsBufGetBlock(pTSBuf, j, blockIndex); - - return tsBufGetElem(pTSBuf); -} - -STSCursor tsBufGetCursor(STSBuf* pTSBuf) { - STSCursor c = {.vnodeIndex = -1}; - if (pTSBuf == NULL) { - return c; - } - - return pTSBuf->cur; -} - -void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) { - if (pTSBuf == NULL || pCur == NULL) { - return; - } - - // assert(pCur->vnodeIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0); - if (pCur->vnodeIndex != -1) { - tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex); - } - - pTSBuf->cur = *pCur; -} - -void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) { - if (pTSBuf == NULL) { - return; - } - - pTSBuf->cur.order = order; -} - -STSBuf* tsBufClone(STSBuf* pTSBuf) { - if (pTSBuf == NULL) { - return NULL; - } - - return tsBufCreateFromFile(pTSBuf->path, false); -} - -void tsBufDisplay(STSBuf* pTSBuf) { - printf("-------start of ts comp file-------\n"); - printf("number of vnode:%d\n", pTSBuf->numOfVnodes); - - int32_t old = pTSBuf->cur.order; - pTSBuf->cur.order = TSQL_SO_ASC; - - tsBufResetPos(pTSBuf); - - while (tsBufNextPos(pTSBuf)) { - STSElem elem = tsBufGetElem(pTSBuf); - printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts); - } - - pTSBuf->cur.order = old; - printf("-------end of ts comp file-------\n"); -} +} \ No newline at end of file diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index ee6dc752d490416f5aeca716cf0a9d2ac60096e3..bc893d30049cd560016c563fc8c560e193afc89d 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -21,7 +21,7 @@ #include "tsclient.h" #include "taosdef.h" -#include "textbuffer.h" +#include "qextbuffer.h" #include "tscSecondaryMerge.h" #include "tschemautil.h" #include "tsocket.h" diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 1b7ed4002be01875fa30226b3bb6e0be348c739d..6c26437b80caefe2a79a522e401098f9edda6124 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -1314,11 +1314,10 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) { * the error handle callback function can rightfully restore the user defined function (fp) */ if (pSql->fp != NULL && multiVnodeInsertion) { - assert(pSql->fetchFp == NULL); pSql->fetchFp = pSql->fp; // replace user defined callback function with multi-insert proxy function - pSql->fp = tscAsyncInsertMultiVnodesProxy; + pSql->fp = (void(*)())launchMultivnodeInsert; } ret = tsParseInsertSql(pSql); diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c index cb991691f5473cb7b8c528f017508d53df7b45b2..a20d7cad1c983ceb54ac3c7c66f19866143d1840 100644 --- a/src/client/src/tscPrepare.c +++ b/src/client/src/tscPrepare.c @@ -15,7 +15,6 @@ #include "taos.h" #include "tsclient.h" -#include "tscSQLParser.h" #include "tscUtil.h" #include "ttimer.h" #include "taosmsg.h" diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c index 419390379505a966096b733d8b75772b97c9ebed..e814965b13d586e11fce309490c15efaddc0428f 100644 --- a/src/client/src/tscSQLParser.c +++ b/src/client/src/tscSQLParser.c @@ -17,17 +17,17 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "qast.h" #include "taos.h" #include "taosmsg.h" #include "tstoken.h" #include "tstrbuild.h" #include "ttime.h" -#include "tast.h" -#include "tscSQLParser.h" #include "tscUtil.h" #include "tschemautil.h" #include "tsclient.h" +#include "ttokendef.h" #define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0" @@ -59,7 +59,7 @@ static int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pD static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength); static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName); -static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool isResultColumn); +static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool finalResult); static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes, int8_t type, char* fieldName, SSqlExpr* pSqlExpr); static int32_t changeFunctionID(int32_t optr, int16_t* functionId); diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 8660dd0e4e38e4062562b37a71e58e8435294203..ed905a8d544197a7474e7fc4256d1bc4e676750d 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -18,7 +18,6 @@ #include "trpc.h" #include "tscJoinProcess.h" #include "tscProfile.h" -#include "tscSQLParser.h" #include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tschemautil.h" @@ -605,7 +604,7 @@ int tscProcessSql(SSqlObj *pSql) { } } } - + if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) { /* * (ref. line: 964) @@ -615,24 +614,16 @@ int tscProcessSql(SSqlObj *pSql) { * when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL, * which causes deadlock. So we keep it as local variable. */ - void *fp = pSql->fp; - if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) { return pRes->code; } - - if (fp == NULL) { - tsem_post(&pSql->emptyRspSem); - tsem_wait(&pSql->rspSem); - tsem_post(&pSql->emptyRspSem); - - // set the command flag must be after the semaphore been correctly set. - pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC; - } - + + return pSql->res.code; + } else if (pSql->fp == (void(*)())launchMultivnodeInsert) { // multi-vnodes insertion + launchMultivnodeInsert(pSql); return pSql->res.code; } - + return doProcessSql(pSql); } diff --git a/src/client/src/tscSql.c b/src/client/src/tscSql.c index bbe2fa8d3a50516a884b5404affab3f53bc5d430..930f054ff3984ed6228338e3ac562fb2516d4997 100644 --- a/src/client/src/tscSql.c +++ b/src/client/src/tscSql.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include #include "hash.h" #include "os.h" #include "tcache.h" @@ -22,7 +21,6 @@ #include "trpc.h" #include "tscJoinProcess.h" #include "tscProfile.h" -#include "tscSQLParser.h" #include "tscSecondaryMerge.h" #include "tscUtil.h" #include "tsclient.h" @@ -30,6 +28,8 @@ #include "tsocket.h" #include "ttimer.h" #include "tutil.h" +#include "ttokendef.h" +#include "qast.h" TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port, void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) { diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 5fd0adf5b15b31abe741dd51025e8e0a5a211230..d3ceb5ee84cb3987d487a30aaba4ae1f2cd7c663 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -15,7 +15,6 @@ #include "os.h" #include "tlog.h" -#include "tscSQLParser.h" #include "ttime.h" #include "ttimer.h" #include "tutil.h" diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index 866398b7f5cfeadf1d5eda6d1354128ce8ed1c75..426570134396956d06122bfd593226cac82a224f 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -38,7 +38,6 @@ int initialized = 0; int slaveIndex; void * tscTmr; void * tscQhandle; -void * tscConnCache; void * tscCheckDiskUsageTmr; int tsInsertHeadSize; @@ -188,8 +187,6 @@ void taos_init_imp() { if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime); - tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000); - initialized = 1; tscTrace("client is initialized successfully"); tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b655832f1159efe399cfeec94f8cc00cbb671f38..c970b1db00fc8432a412b704c708d141ed46ebd9 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -16,8 +16,8 @@ #include "tscUtil.h" #include "hash.h" #include "os.h" +#include "qast.h" #include "taosmsg.h" -#include "tast.h" #include "tcache.h" #include "tkey.h" #include "tmd5.h" @@ -2105,7 +2105,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void void tscDoQuery(SSqlObj* pSql) { SSqlCmd* pCmd = &pSql->cmd; - void* fp = pSql->fp; pSql->res.code = TSDB_CODE_SUCCESS; @@ -2121,7 +2120,6 @@ void tscDoQuery(SSqlObj* pSql) { } else { // pSql may be released in this function if it is a async insertion. tscProcessSql(pSql); - if (NULL == fp) tscProcessMultiVnodesInsert(pSql); } } } @@ -2321,3 +2319,94 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) { tscProcessSql(pSql); } } + +typedef struct SinsertSupporter { + SSubqueryState* pState; + SSqlObj* pSql; +} SinsertSupporter; + +void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) { + SinsertSupporter *pSupporter = (SinsertSupporter *)param; + SSqlObj* pParentObj = pSupporter->pSql; + SSqlCmd* pParentCmd = &pParentObj->cmd; + + SSubqueryState* pState = pSupporter->pState; + int32_t total = pState->numOfTotal; + + // increase the total inserted rows + if (numOfRows > 0) { + pParentObj->res.numOfRows += numOfRows; + } + + int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1); + if (completed < total) { + return; + } + + tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows); + + // release data block data + pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks); + + // restore user defined fp + pParentObj->fp = pParentObj->fetchFp; + + // all data has been sent to vnode, call user function + (*pParentObj->fp)(pParentObj->param, tres, numOfRows); +} + +int32_t launchMultivnodeInsert(SSqlObj *pSql) { + SSqlRes *pRes = &pSql->res; + SSqlCmd *pCmd = &pSql->cmd; + + pRes->qhandle = 1; // hack the qhandle check + SDataBlockList *pDataBlocks = pCmd->pDataBlocks; + + pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES); + pSql->numOfSubs = pDataBlocks->nSize; + assert(pDataBlocks->nSize > 0); + + tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize); + SSubqueryState *pState = calloc(1, sizeof(SSubqueryState)); + pState->numOfTotal = pSql->numOfSubs; + + pRes->code = TSDB_CODE_SUCCESS; + + int32_t i = 0; + for (; i < pSql->numOfSubs; ++i) { + SinsertSupporter* pSupporter = calloc(1, sizeof(SinsertSupporter)); + pSupporter->pSql = pSql; + pSupporter->pState = pState; + + SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL); + if (pNew == NULL) { + tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno)); + break; + } + + pSql->pSubs[i] = pNew; + tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i); + } + + if (i < pSql->numOfSubs) { + tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql); + pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY; + return pRes->code; // free all allocated resource + } + + for (int32_t j = 0; j < pSql->numOfSubs; ++j) { + SSqlObj *pSub = pSql->pSubs[j]; + pSub->cmd.command = TSDB_SQL_INSERT; + int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]); + + if (code != TSDB_CODE_SUCCESS) { + tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j, + pDataBlocks->nSize, code); + } + + tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j); + tscProcessSql(pSub); + } + + return TSDB_CODE_SUCCESS; +} diff --git a/src/inc/tpercentile.h b/src/inc/qpercentile.h similarity index 98% rename from src/inc/tpercentile.h rename to src/inc/qpercentile.h index b9cf50e0bbf24357b729f8bc39996f589d6c18fc..73430bd05ccb07df93c1fc9869539b504c6e618e 100644 --- a/src/inc/tpercentile.h +++ b/src/inc/qpercentile.h @@ -16,7 +16,7 @@ #ifndef TDENGINE_TPERCENTILE_H #define TDENGINE_TPERCENTILE_H -#include "textbuffer.h" +#include "qextbuffer.h" typedef struct MinMaxEntry { union { diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index a3fdf4b2dff4c5d1b149fd56b62498903b374996..ce6336be9e51f97476978a06f5efcdaf7bc524ff 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -14,22 +14,11 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "taoserror.h" -#include "taosmsg.h" -#include "tast.h" -#include "textbuffer.h" -#include "tschemautil.h" -#include "tscompression.h" -#include "tskiplist.h" -#include "tsqlfunction.h" -#include "tstatus.h" -#include "ttime.h" -#include "mnode.h" +#include "mgmtTable.h" #include "mgmtAcct.h" #include "mgmtChildTable.h" -#include "mgmtDb.h" #include "mgmtDClient.h" +#include "mgmtDb.h" #include "mgmtDnode.h" #include "mgmtGrant.h" #include "mgmtMnode.h" @@ -37,9 +26,20 @@ #include "mgmtProfile.h" #include "mgmtShell.h" #include "mgmtSuperTable.h" -#include "mgmtTable.h" #include "mgmtUser.h" #include "mgmtVgroup.h" +#include "mnode.h" +#include "os.h" +#include "qast.h" +#include "qextbuffer.h" +#include "taoserror.h" +#include "taosmsg.h" +#include "tschemautil.h" +#include "tscompression.h" +#include "tskiplist.h" +#include "tsqlfunction.h" +#include "tstatus.h" +#include "ttime.h" extern void *tsNormalTableSdb; extern void *tsChildTableSdb; diff --git a/src/query/inc/tast.h b/src/query/inc/qast.h similarity index 100% rename from src/query/inc/tast.h rename to src/query/inc/qast.h diff --git a/src/query/inc/textbuffer.h b/src/query/inc/qextbuffer.h similarity index 98% rename from src/query/inc/textbuffer.h rename to src/query/inc/qextbuffer.h index 1f2955ba0d231c1aa5c2abefb72f4b2bc4c10c8b..9624cfff59dfc4a22a2048f8145676228bbbadf9 100644 --- a/src/query/inc/textbuffer.h +++ b/src/query/inc/qextbuffer.h @@ -131,13 +131,6 @@ typedef struct tSidSet { SColumnOrderInfo orderIdx; } tSidSet; -/** - * - * @param fileNamePattern - * @param dstPath - */ -void getTmpfilePath(const char *fileNamePattern, char *dstPath); - /** * * @param inMemSize diff --git a/src/query/inc/thistogram.h b/src/query/inc/qhistogram.h similarity index 100% rename from src/query/inc/thistogram.h rename to src/query/inc/qhistogram.h diff --git a/src/query/inc/tinterpolation.h b/src/query/inc/qinterpolation.h similarity index 98% rename from src/query/inc/tinterpolation.h rename to src/query/inc/qinterpolation.h index f4b327bcbec82b2b9ca8e2f5c92b044700240dbc..c8ebd850b61cdc69e5816837d5a439a10ba99e43 100644 --- a/src/query/inc/tinterpolation.h +++ b/src/query/inc/qinterpolation.h @@ -20,6 +20,10 @@ extern "C" { #endif +#include "os.h" +#include "taosdef.h" +#include "qextbuffer.h" + typedef struct SInterpolationInfo { int64_t startTimestamp; int32_t order; // order [asc/desc] diff --git a/src/query/inc/tresultBuf.h b/src/query/inc/qresultBuf.h similarity index 99% rename from src/query/inc/tresultBuf.h rename to src/query/inc/qresultBuf.h index 8f30ff7c61555e993838989aecb32a81d1c414bf..346dc2d00c66e33cf63cffdb3c748cfecad02832 100644 --- a/src/query/inc/tresultBuf.h +++ b/src/query/inc/qresultBuf.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "os.h" -#include "textbuffer.h" +#include "qextbuffer.h" typedef struct SIDList { uint32_t alloc; diff --git a/src/query/inc/qsqlparser.h b/src/query/inc/qsqlparser.h index 7a1322be10fb24581b9f0d38db173a7d7f86612e..064ded2fad49d3001596d6e2e7ac762ce59829b9 100644 --- a/src/query/inc/qsqlparser.h +++ b/src/query/inc/qsqlparser.h @@ -329,6 +329,18 @@ void tSQLSetColumnType(TAOS_FIELD *pField, SSQLToken *pToken); void *ParseAlloc(void *(*mallocProc)(size_t)); +enum { + TSQL_NODE_TYPE_EXPR = 0x1, + TSQL_NODE_TYPE_ID = 0x2, + TSQL_NODE_TYPE_VALUE = 0x4, +}; + +#define NON_ARITHMEIC_EXPR 0 +#define NORMAL_ARITHMETIC 1 +#define AGG_ARIGHTMEIC 2 + +int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql); + #ifdef __cplusplus } #endif diff --git a/src/query/inc/tscSyntaxtreefunction.h b/src/query/inc/qsyntaxtreefunction.h similarity index 100% rename from src/query/inc/tscSyntaxtreefunction.h rename to src/query/inc/qsyntaxtreefunction.h diff --git a/src/query/inc/qtsbuf.h b/src/query/inc/qtsbuf.h new file mode 100644 index 0000000000000000000000000000000000000000..1afdb0cd6cb9175b14675d06446afbe3a891df27 --- /dev/null +++ b/src/query/inc/qtsbuf.h @@ -0,0 +1,142 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_STSBUF_H +#define TDENGINE_STSBUF_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "os.h" +#include "taosdef.h" + +#define MEM_BUF_SIZE (1 << 20) +#define TS_COMP_FILE_MAGIC 0x87F5EC4C +#define TS_COMP_FILE_VNODE_MAX 512 + +typedef struct STSList { + char* rawBuf; + int32_t allocSize; + int32_t threshold; + int32_t len; +} STSList; + +typedef struct STSRawBlock { + int32_t vnode; + int64_t tag; + TSKEY* ts; + int32_t len; +} STSRawBlock; + +typedef struct STSElem { + TSKEY ts; + int64_t tag; + int32_t vnode; +} STSElem; + +typedef struct STSCursor { + int32_t vnodeIndex; + int32_t blockIndex; + int32_t tsIndex; + int32_t order; +} STSCursor; + +typedef struct STSBlock { + int64_t tag; // tag value + int32_t numOfElem; // number of elements + int32_t compLen; // size after compressed + int32_t padding; // 0xFFFFFFFF by default, after the payload + char* payload; // actual data that is compressed +} STSBlock; + +/* + * The size of buffer file should not be greater than 2G, + * and the offset of int32_t type is enough + */ +typedef struct STSVnodeBlockInfo { + int32_t vnode; // vnode id + int32_t offset; // offset set value in file + int32_t numOfBlocks; // number of total blocks + int32_t compLen; // compressed size +} STSVnodeBlockInfo; + +typedef struct STSVnodeBlockInfoEx { + STSVnodeBlockInfo info; + int32_t len; // length before compress +} STSVnodeBlockInfoEx; + +typedef struct STSBuf { + FILE* f; + char path[PATH_MAX]; + uint32_t fileSize; + + STSVnodeBlockInfoEx* pData; + int32_t numOfAlloc; + int32_t numOfVnodes; + + char* assistBuf; + int32_t bufSize; + STSBlock block; + STSList tsData; // uncompressed raw ts data + uint64_t numOfTotal; + bool autoDelete; + int32_t tsOrder; // order of timestamp in ts comp buffer + STSCursor cur; +} STSBuf; + +typedef struct STSBufFileHeader { + uint32_t magic; // file magic number + uint32_t numOfVnode; // number of vnode stored in current file + uint32_t tsOrder; // timestamp order in current file +} STSBufFileHeader; + +STSBuf* tsBufCreate(bool autoDelete); +STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete); +STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder); + +void* tsBufDestory(STSBuf* pTSBuf); + +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len); +int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx); + +STSBuf* tsBufClone(STSBuf* pTSBuf); + +STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId); + +void tsBufFlush(STSBuf* pTSBuf); + +void tsBufResetPos(STSBuf* pTSBuf); +STSElem tsBufGetElem(STSBuf* pTSBuf); +bool tsBufNextPos(STSBuf* pTSBuf); + +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag); + +STSCursor tsBufGetCursor(STSBuf* pTSBuf); +void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order); + +void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur); + +/** + * display all data in comp block file, for debug purpose only + * @param pTSBuf + */ +void tsBufDisplay(STSBuf* pTSBuf); + +#ifdef __cplusplus +} +#endif + +#endif // TDENGINE_STSBUF_H diff --git a/src/query/inc/sql.y b/src/query/inc/sql.y index 2b7e0b628cb0cf951db287722036523749e0fdd8..585aa8a9cf8a5166e00ec23b9782bb979c671634 100644 --- a/src/query/inc/sql.y +++ b/src/query/inc/sql.y @@ -26,7 +26,6 @@ #include #include #include -#include "tscSQLParser.h" #include "tutil.h" } diff --git a/src/query/src/tscAst.c b/src/query/src/qast.c similarity index 98% rename from src/query/src/tscAst.c rename to src/query/src/qast.c index ef83083a2c5aa91fdbbb4db5bbe8330ae188d594..ba8031578fd5ea7194b1e192026c1181a2746a61 100644 --- a/src/query/src/tscAst.c +++ b/src/query/src/qast.c @@ -13,19 +13,18 @@ * along with this program. If not, see . */ +#include "qast.h" #include "os.h" +#include "qsqlparser.h" +#include "qsyntaxtreefunction.h" #include "taosdef.h" #include "taosmsg.h" -#include "tast.h" #include "tlog.h" -#include "tscSyntaxtreefunction.h" #include "tschemautil.h" #include "tsqlfunction.h" #include "tstoken.h" #include "ttokendef.h" -#include "taosdef.h" #include "tutil.h" -#include "qsqlparser.h" /* * @@ -609,6 +608,7 @@ int32_t merge(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset * // } // // return pFinalRes->num; + return 0; } int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset *pFinalRes) { @@ -643,12 +643,13 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults // } // // return pFinalRes->num; + return 0; } /* * traverse the result and apply the function to each item to check if the item is qualified or not */ -static void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { +static UNUSED_FUNC void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) { assert(pExpr->pLeft->nodeType == TSQL_NODE_COL && pExpr->pRight->nodeType == TSQL_NODE_VALUE); // brutal force scan the result list and check for each item in the list @@ -705,7 +706,7 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu * @param pSchema tag schemas * @param fp filter callback function */ -static void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) { +static UNUSED_FUNC void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) { int32_t n = 0; for (int32_t i = 0; i < pResult->num; ++i) { void *pItem = pResult->pRes[i]; diff --git a/src/query/src/textbuffer.c b/src/query/src/qextbuffer.c similarity index 98% rename from src/query/src/textbuffer.c rename to src/query/src/qextbuffer.c index 056fe808588e673126081355ca31020f672ab368..d71d5669996ef306b7185d899c3eab52f893aa61 100644 --- a/src/query/src/textbuffer.c +++ b/src/query/src/qextbuffer.c @@ -12,42 +12,19 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ +#include "qextbuffer.h" #include "os.h" #include "taos.h" +#include "taosdef.h" #include "taosmsg.h" -#include "textbuffer.h" #include "tlog.h" #include "tsqlfunction.h" #include "ttime.h" -#include "taosdef.h" #include "tutil.h" #define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \ (data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes) -int32_t tmpFileSerialNum = 0; - -void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { - const char* tdengineTmpFileNamePrefix = "tdengine-"; - - char tmpPath[MAX_TMPFILE_PATH_LENGTH] = {0}; - -#ifdef WINDOWS - char *tmpDir = getenv("tmp"); - if (tmpDir == NULL) { - tmpDir = ""; - } -#else - char *tmpDir = "/tmp/"; -#endif - - strcpy(tmpPath, tmpDir); - strcat(tmpPath, tdengineTmpFileNamePrefix); - strcat(tmpPath, fileNamePrefix); - strcat(tmpPath, "-%llu-%u"); - snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1)); -} - /* * SColumnModel is deeply copy */ diff --git a/src/query/src/thistogram.c b/src/query/src/qhistogram.c similarity index 99% rename from src/query/src/thistogram.c rename to src/query/src/qhistogram.c index 31045a4957be59d924eb70fb0876eefb6715aaa2..26482e9f142728f0b097511f98ca2e297e34ef7b 100644 --- a/src/query/src/thistogram.c +++ b/src/query/src/qhistogram.c @@ -14,10 +14,10 @@ */ #include "os.h" +#include "qhistogram.h" +#include "taosdef.h" #include "taosmsg.h" -#include "thistogram.h" #include "tlosertree.h" -#include "taosdef.h" /** * diff --git a/src/query/src/tinterpolation.c b/src/query/src/qinterpolation.c similarity index 99% rename from src/query/src/tinterpolation.c rename to src/query/src/qinterpolation.c index 1a9da44788f5c9bc5a066bd927beaefd0f58ea34..1731e16ed82703753c59d2a8f34411b39eba8c3b 100644 --- a/src/query/src/tinterpolation.c +++ b/src/query/src/qinterpolation.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ +#include "qinterpolation.h" #include "os.h" +#include "qextbuffer.h" +#include "taosdef.h" #include "taosmsg.h" -#include "textbuffer.h" -#include "tinterpolation.h" #include "tsqlfunction.h" -#include "taosdef.h" #define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC) diff --git a/src/query/src/tscSQLParserImpl.c b/src/query/src/qparserImpl.c similarity index 100% rename from src/query/src/tscSQLParserImpl.c rename to src/query/src/qparserImpl.c diff --git a/src/query/src/tpercentile.c b/src/query/src/qpercentile.c similarity index 99% rename from src/query/src/tpercentile.c rename to src/query/src/qpercentile.c index 6e1c28c516ce08e233b7c9afa418e571bd94e051..3b12dee0538200903ae19f2a364ea2b75543a1ae 100644 --- a/src/query/src/tpercentile.c +++ b/src/query/src/qpercentile.c @@ -15,11 +15,10 @@ #include "os.h" -#include "taosmsg.h" +#include "qpercentile.h" #include "taosdef.h" +#include "taosmsg.h" #include "tlog.h" -#include "taosdef.h" -#include "tpercentile.h" tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) { tExtMemBuffer *pBuffer = NULL; diff --git a/src/query/src/tresultBuf.c b/src/query/src/qresultBuf.c similarity index 99% rename from src/query/src/tresultBuf.c rename to src/query/src/qresultBuf.c index 11e17cc5a3437c8c03176cb2ead3318bb9a42d88..fa7c59be4e692885f812075f034936cb37e5d3ce 100644 --- a/src/query/src/tresultBuf.c +++ b/src/query/src/qresultBuf.c @@ -1,9 +1,9 @@ +#include "qresultBuf.h" #include "hash.h" +#include "qextbuffer.h" #include "taoserror.h" -#include "textbuffer.h" #include "tlog.h" #include "tsqlfunction.h" -#include "tresultBuf.h" #define DEFAULT_INTERN_BUF_SIZE 16384L diff --git a/src/query/src/tscSyntaxtreefunction.c b/src/query/src/qsyntaxtreefunction.c similarity index 99% rename from src/query/src/tscSyntaxtreefunction.c rename to src/query/src/qsyntaxtreefunction.c index e3c00ed59d1da9ee6e042f748a6bf6f586f8bd2f..d21f7dab736b1b14579f3b207da89695e93a1077 100644 --- a/src/query/src/tscSyntaxtreefunction.c +++ b/src/query/src/qsyntaxtreefunction.c @@ -15,7 +15,7 @@ #include "os.h" -#include "tscSyntaxtreefunction.h" +#include "qsyntaxtreefunction.h" #include "taosdef.h" #include "tutil.h" diff --git a/src/query/src/ttokenizer.c b/src/query/src/qtokenizer.c similarity index 100% rename from src/query/src/ttokenizer.c rename to src/query/src/qtokenizer.c diff --git a/src/query/src/qtsbuf.c b/src/query/src/qtsbuf.c new file mode 100644 index 0000000000000000000000000000000000000000..ea6e6dfdc0f6db5f18b2254887df028f75233fa6 --- /dev/null +++ b/src/query/src/qtsbuf.c @@ -0,0 +1,923 @@ +#include "qtsbuf.h" +#include "tscompression.h" +#include "tutil.h" +#include "taoserror.h" + +static int32_t getDataStartOffset(); +static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo); +static STSBuf* allocResForTSBuf(STSBuf* pTSBuf); +static int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader); + + +/** + * todo error handling + * support auto closeable tmp file + * @param path + * @return + */ +STSBuf* tsBufCreate(bool autoDelete) { + STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); + if (pTSBuf == NULL) { + return NULL; + } + + getTmpfilePath("join", pTSBuf->path); + pTSBuf->f = fopen(pTSBuf->path, "w+"); + if (pTSBuf->f == NULL) { + free(pTSBuf); + return NULL; + } + + if (NULL == allocResForTSBuf(pTSBuf)) { + return NULL; + } + + // update the header info + STSBufFileHeader header = {.magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = TSQL_SO_ASC}; + STSBufUpdateHeader(pTSBuf, &header); + + tsBufResetPos(pTSBuf); + pTSBuf->cur.order = TSQL_SO_ASC; + + pTSBuf->autoDelete = autoDelete; + pTSBuf->tsOrder = -1; + + return pTSBuf; +} + +STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) { + STSBuf* pTSBuf = calloc(1, sizeof(STSBuf)); + if (pTSBuf == NULL) { + return NULL; + } + + strncpy(pTSBuf->path, path, PATH_MAX); + + pTSBuf->f = fopen(pTSBuf->path, "r+"); + if (pTSBuf->f == NULL) { + free(pTSBuf); + return NULL; + } + + if (allocResForTSBuf(pTSBuf) == NULL) { + return NULL; + } + + // validate the file magic number + STSBufFileHeader header = {0}; + fseek(pTSBuf->f, 0, SEEK_SET); + fread(&header, 1, sizeof(header), pTSBuf->f); + + // invalid file + if (header.magic != TS_COMP_FILE_MAGIC) { + return NULL; + } + + if (header.numOfVnode > pTSBuf->numOfAlloc) { + pTSBuf->numOfAlloc = header.numOfVnode; + STSVnodeBlockInfoEx* tmp = realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * pTSBuf->numOfAlloc); + if (tmp == NULL) { + tsBufDestory(pTSBuf); + return NULL; + } + + pTSBuf->pData = tmp; + } + + pTSBuf->numOfVnodes = header.numOfVnode; + + // check the ts order + pTSBuf->tsOrder = header.tsOrder; + if (pTSBuf->tsOrder != TSQL_SO_ASC && pTSBuf->tsOrder != TSQL_SO_DESC) { +// tscError("invalid order info in buf:%d", pTSBuf->tsOrder); + tsBufDestory(pTSBuf); + return NULL; + } + + size_t infoSize = sizeof(STSVnodeBlockInfo) * pTSBuf->numOfVnodes; + + STSVnodeBlockInfo* buf = (STSVnodeBlockInfo*)calloc(1, infoSize); + + //int64_t pos = ftell(pTSBuf->f); //pos not used + fread(buf, infoSize, 1, pTSBuf->f); + + // the length value for each vnode is not kept in file, so does not set the length value + for (int32_t i = 0; i < pTSBuf->numOfVnodes; ++i) { + STSVnodeBlockInfoEx* pBlockList = &pTSBuf->pData[i]; + memcpy(&pBlockList->info, &buf[i], sizeof(STSVnodeBlockInfo)); + } + + free(buf); + + fseek(pTSBuf->f, 0, SEEK_END); + + struct stat fileStat; + fstat(fileno(pTSBuf->f), &fileStat); + + pTSBuf->fileSize = (uint32_t)fileStat.st_size; + tsBufResetPos(pTSBuf); + + // ascending by default + pTSBuf->cur.order = TSQL_SO_ASC; + + pTSBuf->autoDelete = autoDelete; + +// tscTrace("create tsBuf from file:%s, fd:%d, size:%d, numOfVnode:%d, autoDelete:%d", pTSBuf->path, fileno(pTSBuf->f), +// pTSBuf->fileSize, pTSBuf->numOfVnodes, pTSBuf->autoDelete); + + return pTSBuf; +} + +void* tsBufDestory(STSBuf* pTSBuf) { + if (pTSBuf == NULL) { + return NULL; + } + + tfree(pTSBuf->assistBuf); + tfree(pTSBuf->tsData.rawBuf); + + tfree(pTSBuf->pData); + tfree(pTSBuf->block.payload); + + fclose(pTSBuf->f); + + if (pTSBuf->autoDelete) { +// ("tsBuf %p destroyed, delete tmp file:%s", pTSBuf, pTSBuf->path); + unlink(pTSBuf->path); + } else { +// tscTrace("tsBuf %p destroyed, tmp file:%s, remains", pTSBuf, pTSBuf->path); + } + + free(pTSBuf); + return NULL; +} + +static STSVnodeBlockInfoEx* tsBufGetLastVnodeInfo(STSBuf* pTSBuf) { + int32_t last = pTSBuf->numOfVnodes - 1; + + assert(last >= 0); + return &pTSBuf->pData[last]; +} + +static STSVnodeBlockInfoEx* addOneVnodeInfo(STSBuf* pTSBuf, int32_t vnodeId) { + if (pTSBuf->numOfAlloc <= pTSBuf->numOfVnodes) { + uint32_t newSize = (uint32_t)(pTSBuf->numOfAlloc * 1.5); + assert(newSize > pTSBuf->numOfAlloc); + + STSVnodeBlockInfoEx* tmp = (STSVnodeBlockInfoEx*)realloc(pTSBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); + if (tmp == NULL) { + return NULL; + } + + pTSBuf->pData = tmp; + pTSBuf->numOfAlloc = newSize; + memset(&pTSBuf->pData[pTSBuf->numOfVnodes], 0, sizeof(STSVnodeBlockInfoEx) * (newSize - pTSBuf->numOfVnodes)); + } + + if (pTSBuf->numOfVnodes > 0) { + STSVnodeBlockInfoEx* pPrevBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); + + // update prev vnode length info in file + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pPrevBlockInfoEx->info); + } + + // set initial value for vnode block + STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[pTSBuf->numOfVnodes].info; + pBlockInfo->vnode = vnodeId; + pBlockInfo->offset = pTSBuf->fileSize; + assert(pBlockInfo->offset >= getDataStartOffset()); + + // update vnode info in file + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes, pBlockInfo); + + // add one vnode info + pTSBuf->numOfVnodes += 1; + + // update the header info + STSBufFileHeader header = { + .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; + + STSBufUpdateHeader(pTSBuf, &header); + return tsBufGetLastVnodeInfo(pTSBuf); +} + +static void shrinkBuffer(STSList* ptsData) { + // shrink tmp buffer size if it consumes too many memory compared to the pre-defined size + if (ptsData->allocSize >= ptsData->threshold * 2) { + ptsData->rawBuf = realloc(ptsData->rawBuf, MEM_BUF_SIZE); + ptsData->allocSize = MEM_BUF_SIZE; + } +} + +static void writeDataToDisk(STSBuf* pTSBuf) { + if (pTSBuf->tsData.len == 0) { + return; + } + + STSBlock* pBlock = &pTSBuf->block; + + pBlock->numOfElem = pTSBuf->tsData.len / TSDB_KEYSIZE; + pBlock->compLen = + tsCompressTimestamp(pTSBuf->tsData.rawBuf, pTSBuf->tsData.len, pTSBuf->tsData.len / TSDB_KEYSIZE, pBlock->payload, + pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); + + int64_t r = fseek(pTSBuf->f, pTSBuf->fileSize, SEEK_SET); + UNUSED(r); + + /* + * format for output data: + * 1. tags, number of ts, size after compressed, payload, size after compressed + * 2. tags, number of ts, size after compressed, payload, size after compressed + * + * both side has the compressed length is used to support load data forwards/backwords. + */ + fwrite(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); + fwrite(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); + + fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); + + fwrite(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); + + fwrite(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); + + int32_t blockSize = sizeof(pBlock->tag) + sizeof(pBlock->numOfElem) + sizeof(pBlock->compLen) * 2 + pBlock->compLen; + pTSBuf->fileSize += blockSize; + + pTSBuf->tsData.len = 0; + + STSVnodeBlockInfoEx* pVnodeBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); + + pVnodeBlockInfoEx->info.compLen += blockSize; + pVnodeBlockInfoEx->info.numOfBlocks += 1; + + shrinkBuffer(&pTSBuf->tsData); +} + +static void expandBuffer(STSList* ptsData, int32_t inputSize) { + if (ptsData->allocSize - ptsData->len < inputSize) { + int32_t newSize = inputSize + ptsData->len; + char* tmp = realloc(ptsData->rawBuf, (size_t)newSize); + if (tmp == NULL) { + // todo + } + + ptsData->rawBuf = tmp; + ptsData->allocSize = newSize; + } +} + +STSBlock* readDataFromDisk(STSBuf* pTSBuf, int32_t order, bool decomp) { + STSBlock* pBlock = &pTSBuf->block; + + // clear the memory buffer + void* tmp = pBlock->payload; + memset(pBlock, 0, sizeof(STSBlock)); + pBlock->payload = tmp; + + if (order == TSQL_SO_DESC) { + /* + * set the right position for the reversed traverse, the reversed traverse is started from + * the end of each comp data block + */ + fseek(pTSBuf->f, -sizeof(pBlock->padding), SEEK_CUR); + fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); + + pBlock->compLen = pBlock->padding; + int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); + fseek(pTSBuf->f, -offset, SEEK_CUR); + } + + fread(&pBlock->tag, sizeof(pBlock->tag), 1, pTSBuf->f); + fread(&pBlock->numOfElem, sizeof(pBlock->numOfElem), 1, pTSBuf->f); + + fread(&pBlock->compLen, sizeof(pBlock->compLen), 1, pTSBuf->f); + fread(pBlock->payload, (size_t)pBlock->compLen, 1, pTSBuf->f); + + if (decomp) { + pTSBuf->tsData.len = + tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, + pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); + } + + // read the comp length at the length of comp block + fread(&pBlock->padding, sizeof(pBlock->padding), 1, pTSBuf->f); + + // for backwards traverse, set the start position at the end of previous block + if (order == TSQL_SO_DESC) { + int32_t offset = pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); + int64_t r = fseek(pTSBuf->f, -offset, SEEK_CUR); + UNUSED(r); + } + + return pBlock; +} + +// set the order of ts buffer if the ts order has not been set yet +static int32_t setCheckTSOrder(STSBuf* pTSBuf, const char* pData, int32_t len) { + STSList* ptsData = &pTSBuf->tsData; + + if (pTSBuf->tsOrder == -1) { + if (ptsData->len > 0) { + TSKEY lastKey = *(TSKEY*)(ptsData->rawBuf + ptsData->len - TSDB_KEYSIZE); + + if (lastKey > *(TSKEY*)pData) { + pTSBuf->tsOrder = TSQL_SO_DESC; + } else { + pTSBuf->tsOrder = TSQL_SO_ASC; + } + } else if (len > TSDB_KEYSIZE) { + // no data in current vnode, more than one ts is added, check the orders + TSKEY k1 = *(TSKEY*)(pData); + TSKEY k2 = *(TSKEY*)(pData + TSDB_KEYSIZE); + + if (k1 < k2) { + pTSBuf->tsOrder = TSQL_SO_ASC; + } else if (k1 > k2) { + pTSBuf->tsOrder = TSQL_SO_DESC; + } else { + // todo handle error + } + } + } else { + // todo the timestamp order is set, check the asc/desc order of appended data + } + + return TSDB_CODE_SUCCESS; +} + +void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len) { + STSVnodeBlockInfoEx* pBlockInfo = NULL; + STSList* ptsData = &pTSBuf->tsData; + + if (pTSBuf->numOfVnodes == 0 || tsBufGetLastVnodeInfo(pTSBuf)->info.vnode != vnodeId) { + writeDataToDisk(pTSBuf); + shrinkBuffer(ptsData); + + pBlockInfo = addOneVnodeInfo(pTSBuf, vnodeId); + } else { + pBlockInfo = tsBufGetLastVnodeInfo(pTSBuf); + } + + assert(pBlockInfo->info.vnode == vnodeId); + + if (pTSBuf->block.tag != tag && ptsData->len > 0) { + // new arrived data with different tags value, save current value into disk first + writeDataToDisk(pTSBuf); + } else { + expandBuffer(ptsData, len); + } + + pTSBuf->block.tag = tag; + memcpy(ptsData->rawBuf + ptsData->len, pData, (size_t)len); + + // todo check return value + setCheckTSOrder(pTSBuf, pData, len); + + ptsData->len += len; + pBlockInfo->len += len; + + pTSBuf->numOfTotal += len / TSDB_KEYSIZE; + + // the size of raw data exceeds the size of the default prepared buffer, so + // during getBufBlock, the output buffer needs to be large enough. + if (ptsData->len >= ptsData->threshold) { + writeDataToDisk(pTSBuf); + shrinkBuffer(ptsData); + } + + tsBufResetPos(pTSBuf); +} + +void tsBufFlush(STSBuf* pTSBuf) { + if (pTSBuf->tsData.len <= 0) { + return; + } + + writeDataToDisk(pTSBuf); + shrinkBuffer(&pTSBuf->tsData); + + STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pTSBuf); + + // update prev vnode length info in file + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, &pBlockInfoEx->info); + + // save the ts order into header + STSBufFileHeader header = { + .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; + STSBufUpdateHeader(pTSBuf, &header); + + fsync(fileno(pTSBuf->f)); +} + +static int32_t tsBufFindVnodeIndexFromId(STSVnodeBlockInfoEx* pVnodeInfoEx, int32_t numOfVnodes, int32_t vnodeId) { + int32_t j = -1; + for (int32_t i = 0; i < numOfVnodes; ++i) { + if (pVnodeInfoEx[i].info.vnode == vnodeId) { + j = i; + break; + } + } + + return j; +} + +// todo opt performance by cache blocks info +static int32_t tsBufFindBlock(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int32_t blockIndex) { + if (fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET) != 0) { + return -1; + } + + // sequentially read the compressed data blocks, start from the beginning of the comp data block of this vnode + int32_t i = 0; + bool decomp = false; + + while ((i++) <= blockIndex) { + if (readDataFromDisk(pTSBuf, TSQL_SO_ASC, decomp) == NULL) { + return -1; + } + } + + // set the file position to be the end of previous comp block + if (pTSBuf->cur.order == TSQL_SO_DESC) { + STSBlock* pBlock = &pTSBuf->block; + int32_t compBlockSize = + pBlock->compLen + sizeof(pBlock->compLen) * 2 + sizeof(pBlock->numOfElem) + sizeof(pBlock->tag); + fseek(pTSBuf->f, -compBlockSize, SEEK_CUR); + } + + return 0; +} + +static int32_t tsBufFindBlockByTag(STSBuf* pTSBuf, STSVnodeBlockInfo* pBlockInfo, int64_t tag) { + bool decomp = false; + + int64_t offset = 0; + if (pTSBuf->cur.order == TSQL_SO_ASC) { + offset = pBlockInfo->offset; + } else { // reversed traverse starts from the end of block + offset = pBlockInfo->offset + pBlockInfo->compLen; + } + + if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) { + return -1; + } + + for (int32_t i = 0; i < pBlockInfo->numOfBlocks; ++i) { + if (readDataFromDisk(pTSBuf, pTSBuf->cur.order, decomp) == NULL) { + return -1; + } + + if (pTSBuf->block.tag == tag) { + return i; + } + } + + return -1; +} + +static void tsBufGetBlock(STSBuf* pTSBuf, int32_t vnodeIndex, int32_t blockIndex) { + STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[vnodeIndex].info; + if (pBlockInfo->numOfBlocks <= blockIndex) { + assert(false); + } + + STSCursor* pCur = &pTSBuf->cur; + if (pCur->vnodeIndex == vnodeIndex && ((pCur->blockIndex <= blockIndex && pCur->order == TSQL_SO_ASC) || + (pCur->blockIndex >= blockIndex && pCur->order == TSQL_SO_DESC))) { + int32_t i = 0; + bool decomp = false; + int32_t step = abs(blockIndex - pCur->blockIndex); + + while ((++i) <= step) { + if (readDataFromDisk(pTSBuf, pCur->order, decomp) == NULL) { + return; + } + } + } else { + if (tsBufFindBlock(pTSBuf, pBlockInfo, blockIndex) == -1) { + assert(false); + } + } + + STSBlock* pBlock = &pTSBuf->block; + + size_t s = pBlock->numOfElem * TSDB_KEYSIZE; + + /* + * In order to accommodate all the qualified data, the actual buffer size for one block with identical tags value + * may exceed the maximum allowed size during *tsBufAppend* function by invoking expandBuffer function + */ + if (s > pTSBuf->tsData.allocSize) { + expandBuffer(&pTSBuf->tsData, s); + } + + pTSBuf->tsData.len = + tsDecompressTimestamp(pBlock->payload, pBlock->compLen, pBlock->numOfElem, pTSBuf->tsData.rawBuf, + pTSBuf->tsData.allocSize, TWO_STAGE_COMP, pTSBuf->assistBuf, pTSBuf->bufSize); + + assert((pTSBuf->tsData.len / TSDB_KEYSIZE == pBlock->numOfElem) && (pTSBuf->tsData.allocSize >= pTSBuf->tsData.len)); + + pCur->vnodeIndex = vnodeIndex; + pCur->blockIndex = blockIndex; + + pCur->tsIndex = (pCur->order == TSQL_SO_ASC) ? 0 : pBlock->numOfElem - 1; +} + +STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId) { + int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); + if (j == -1) { + return NULL; + } + + return &pTSBuf->pData[j].info; +} + +int32_t STSBufUpdateHeader(STSBuf* pTSBuf, STSBufFileHeader* pHeader) { + if ((pTSBuf->f == NULL) || pHeader == NULL || pHeader->numOfVnode < 0 || pHeader->magic != TS_COMP_FILE_MAGIC) { + return -1; + } + + int64_t r = fseek(pTSBuf->f, 0, SEEK_SET); + if (r != 0) { + return -1; + } + + fwrite(pHeader, sizeof(STSBufFileHeader), 1, pTSBuf->f); + return 0; +} + +bool tsBufNextPos(STSBuf* pTSBuf) { + if (pTSBuf == NULL || pTSBuf->numOfVnodes == 0) { + return false; + } + + STSCursor* pCur = &pTSBuf->cur; + + // get the first/last position according to traverse order + if (pCur->vnodeIndex == -1) { + if (pCur->order == TSQL_SO_ASC) { + tsBufGetBlock(pTSBuf, 0, 0); + + if (pTSBuf->block.numOfElem == 0) { // the whole list is empty, return + tsBufResetPos(pTSBuf); + return false; + } else { + return true; + } + + } else { // get the last timestamp record in the last block of the last vnode + assert(pTSBuf->numOfVnodes > 0); + + int32_t vnodeIndex = pTSBuf->numOfVnodes - 1; + pCur->vnodeIndex = vnodeIndex; + + int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; + STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); + int32_t blockIndex = pBlockInfo->numOfBlocks - 1; + + tsBufGetBlock(pTSBuf, vnodeIndex, blockIndex); + + pCur->tsIndex = pTSBuf->block.numOfElem - 1; + if (pTSBuf->block.numOfElem == 0) { + tsBufResetPos(pTSBuf); + return false; + } else { + return true; + } + } + } + + int32_t step = pCur->order == TSQL_SO_ASC ? 1 : -1; + + while (1) { + assert(pTSBuf->tsData.len == pTSBuf->block.numOfElem * TSDB_KEYSIZE); + + if ((pCur->order == TSQL_SO_ASC && pCur->tsIndex >= pTSBuf->block.numOfElem - 1) || + (pCur->order == TSQL_SO_DESC && pCur->tsIndex <= 0)) { + int32_t vnodeId = pTSBuf->pData[pCur->vnodeIndex].info.vnode; + + STSVnodeBlockInfo* pBlockInfo = tsBufGetVnodeBlockInfo(pTSBuf, vnodeId); + if (pBlockInfo == NULL || (pCur->blockIndex >= pBlockInfo->numOfBlocks - 1 && pCur->order == TSQL_SO_ASC) || + (pCur->blockIndex <= 0 && pCur->order == TSQL_SO_DESC)) { + if ((pCur->vnodeIndex >= pTSBuf->numOfVnodes - 1 && pCur->order == TSQL_SO_ASC) || + (pCur->vnodeIndex <= 0 && pCur->order == TSQL_SO_DESC)) { + pCur->vnodeIndex = -1; + return false; + } + + if (pBlockInfo == NULL) { + return false; + } + + int32_t blockIndex = pCur->order == TSQL_SO_ASC ? 0 : pBlockInfo->numOfBlocks - 1; + tsBufGetBlock(pTSBuf, pCur->vnodeIndex + step, blockIndex); + break; + + } else { + tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex + step); + break; + } + } else { + pCur->tsIndex += step; + break; + } + } + + return true; +} + +void tsBufResetPos(STSBuf* pTSBuf) { + if (pTSBuf == NULL) { + return; + } + + pTSBuf->cur = (STSCursor){.tsIndex = -1, .blockIndex = -1, .vnodeIndex = -1, .order = pTSBuf->cur.order}; +} + +STSElem tsBufGetElem(STSBuf* pTSBuf) { + STSElem elem1 = {.vnode = -1}; + STSCursor* pCur = &pTSBuf->cur; + + if (pTSBuf == NULL || pCur->vnodeIndex < 0) { + return elem1; + } + + STSBlock* pBlock = &pTSBuf->block; + + elem1.vnode = pTSBuf->pData[pCur->vnodeIndex].info.vnode; + elem1.ts = *(TSKEY*)(pTSBuf->tsData.rawBuf + pCur->tsIndex * TSDB_KEYSIZE); + elem1.tag = pBlock->tag; + + return elem1; +} + +/** + * current only support ts comp data from two vnode merge + * @param pDestBuf + * @param pSrcBuf + * @param vnodeId + * @return + */ +int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeId) { + if (pDestBuf == NULL || pSrcBuf == NULL || pSrcBuf->numOfVnodes <= 0) { + return 0; + } + + if (pDestBuf->numOfVnodes + pSrcBuf->numOfVnodes > TS_COMP_FILE_VNODE_MAX) { + return -1; + } + + // src can only have one vnode index + if (pSrcBuf->numOfVnodes > 1) { + return -1; + } + + // there are data in buffer, flush to disk first + tsBufFlush(pDestBuf); + + // compared with the last vnode id + if (vnodeId != tsBufGetLastVnodeInfo(pDestBuf)->info.vnode) { + int32_t oldSize = pDestBuf->numOfVnodes; + int32_t newSize = oldSize + pSrcBuf->numOfVnodes; + + if (pDestBuf->numOfAlloc < newSize) { + pDestBuf->numOfAlloc = newSize; + + STSVnodeBlockInfoEx* tmp = realloc(pDestBuf->pData, sizeof(STSVnodeBlockInfoEx) * newSize); + if (tmp == NULL) { + return -1; + } + + pDestBuf->pData = tmp; + } + + // directly copy the vnode index information + memcpy(&pDestBuf->pData[oldSize], pSrcBuf->pData, (size_t)pSrcBuf->numOfVnodes * sizeof(STSVnodeBlockInfoEx)); + + // set the new offset value + for (int32_t i = 0; i < pSrcBuf->numOfVnodes; ++i) { + STSVnodeBlockInfoEx* pBlockInfoEx = &pDestBuf->pData[i + oldSize]; + pBlockInfoEx->info.offset = (pSrcBuf->pData[i].info.offset - getDataStartOffset()) + pDestBuf->fileSize; + pBlockInfoEx->info.vnode = vnodeId; + } + + pDestBuf->numOfVnodes = newSize; + } else { + STSVnodeBlockInfoEx* pBlockInfoEx = tsBufGetLastVnodeInfo(pDestBuf); + + pBlockInfoEx->len += pSrcBuf->pData[0].len; + pBlockInfoEx->info.numOfBlocks += pSrcBuf->pData[0].info.numOfBlocks; + pBlockInfoEx->info.compLen += pSrcBuf->pData[0].info.compLen; + pBlockInfoEx->info.vnode = vnodeId; + } + + int32_t r = fseek(pDestBuf->f, 0, SEEK_END); + assert(r == 0); + + int64_t offset = getDataStartOffset(); + int32_t size = pSrcBuf->fileSize - offset; + +#ifdef LINUX + ssize_t rc = tsendfile(fileno(pDestBuf->f), fileno(pSrcBuf->f), &offset, size); +#else + ssize_t rc = fsendfile(pDestBuf->f, pSrcBuf->f, &offset, size); +#endif + + if (rc == -1) { +// tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); + return -1; + } + + if (rc != size) { +// tscError("failed to merge tsBuf from:%s to %s, reason:%s\n", pSrcBuf->path, pDestBuf->path, strerror(errno)); + return -1; + } + + pDestBuf->numOfTotal += pSrcBuf->numOfTotal; + + int32_t oldSize = pDestBuf->fileSize; + + struct stat fileStat; + fstat(fileno(pDestBuf->f), &fileStat); + pDestBuf->fileSize = (uint32_t)fileStat.st_size; + + assert(pDestBuf->fileSize == oldSize + size); + +// tscTrace("tsBuf merge success, %p, path:%s, fd:%d, file size:%d, numOfVnode:%d, autoDelete:%d", pDestBuf, +// pDestBuf->path, fileno(pDestBuf->f), pDestBuf->fileSize, pDestBuf->numOfVnodes, pDestBuf->autoDelete); + + return 0; +} + +STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t order) { + STSBuf* pTSBuf = tsBufCreate(true); + + STSVnodeBlockInfo* pBlockInfo = &(addOneVnodeInfo(pTSBuf, 0)->info); + pBlockInfo->numOfBlocks = numOfBlocks; + pBlockInfo->compLen = len; + pBlockInfo->offset = getDataStartOffset(); + pBlockInfo->vnode = 0; + + // update prev vnode length info in file + TSBufUpdateVnodeInfo(pTSBuf, pTSBuf->numOfVnodes - 1, pBlockInfo); + + fseek(pTSBuf->f, pBlockInfo->offset, SEEK_SET); + fwrite((void*)pData, 1, len, pTSBuf->f); + pTSBuf->fileSize += len; + + pTSBuf->tsOrder = order; + assert(order == TSQL_SO_ASC || order == TSQL_SO_DESC); + + STSBufFileHeader header = { + .magic = TS_COMP_FILE_MAGIC, .numOfVnode = pTSBuf->numOfVnodes, .tsOrder = pTSBuf->tsOrder}; + STSBufUpdateHeader(pTSBuf, &header); + + fsync(fileno(pTSBuf->f)); + + return pTSBuf; +} + +STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag) { + STSElem elem = {.vnode = -1}; + + if (pTSBuf == NULL) { + return elem; + } + + int32_t j = tsBufFindVnodeIndexFromId(pTSBuf->pData, pTSBuf->numOfVnodes, vnodeId); + if (j == -1) { + return elem; + } + + // for debug purpose + // tsBufDisplay(pTSBuf); + + STSCursor* pCur = &pTSBuf->cur; + STSVnodeBlockInfo* pBlockInfo = &pTSBuf->pData[j].info; + + int32_t blockIndex = tsBufFindBlockByTag(pTSBuf, pBlockInfo, tag); + if (blockIndex < 0) { + return elem; + } + + pCur->vnodeIndex = j; + pCur->blockIndex = blockIndex; + tsBufGetBlock(pTSBuf, j, blockIndex); + + return tsBufGetElem(pTSBuf); +} + +STSCursor tsBufGetCursor(STSBuf* pTSBuf) { + STSCursor c = {.vnodeIndex = -1}; + if (pTSBuf == NULL) { + return c; + } + + return pTSBuf->cur; +} + +void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur) { + if (pTSBuf == NULL || pCur == NULL) { + return; + } + + // assert(pCur->vnodeIndex != -1 && pCur->tsIndex >= 0 && pCur->blockIndex >= 0); + if (pCur->vnodeIndex != -1) { + tsBufGetBlock(pTSBuf, pCur->vnodeIndex, pCur->blockIndex); + } + + pTSBuf->cur = *pCur; +} + +void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order) { + if (pTSBuf == NULL) { + return; + } + + pTSBuf->cur.order = order; +} + +STSBuf* tsBufClone(STSBuf* pTSBuf) { + if (pTSBuf == NULL) { + return NULL; + } + + return tsBufCreateFromFile(pTSBuf->path, false); +} + +void tsBufDisplay(STSBuf* pTSBuf) { + printf("-------start of ts comp file-------\n"); + printf("number of vnode:%d\n", pTSBuf->numOfVnodes); + + int32_t old = pTSBuf->cur.order; + pTSBuf->cur.order = TSQL_SO_ASC; + + tsBufResetPos(pTSBuf); + + while (tsBufNextPos(pTSBuf)) { + STSElem elem = tsBufGetElem(pTSBuf); + printf("%d-%" PRId64 "-%" PRId64 "\n", elem.vnode, *(int64_t*) elem.tag, elem.ts); + } + + pTSBuf->cur.order = old; + printf("-------end of ts comp file-------\n"); +} + +static int32_t getDataStartOffset() { + return sizeof(STSBufFileHeader) + TS_COMP_FILE_VNODE_MAX * sizeof(STSVnodeBlockInfo); +} + +static int32_t doUpdateVnodeInfo(STSBuf* pTSBuf, int64_t offset, STSVnodeBlockInfo* pVInfo) { + if (offset < 0 || offset >= getDataStartOffset()) { + return -1; + } + + if (fseek(pTSBuf->f, offset, SEEK_SET) != 0) { + return -1; + } + + fwrite(pVInfo, sizeof(STSVnodeBlockInfo), 1, pTSBuf->f); + return 0; +} + +// update prev vnode length info in file +static void TSBufUpdateVnodeInfo(STSBuf* pTSBuf, int32_t index, STSVnodeBlockInfo* pBlockInfo) { + int32_t offset = sizeof(STSBufFileHeader) + index * sizeof(STSVnodeBlockInfo); + doUpdateVnodeInfo(pTSBuf, offset, pBlockInfo); +} + +static STSBuf* allocResForTSBuf(STSBuf* pTSBuf) { + const int32_t INITIAL_VNODEINFO_SIZE = 4; + + pTSBuf->numOfAlloc = INITIAL_VNODEINFO_SIZE; + pTSBuf->pData = calloc(pTSBuf->numOfAlloc, sizeof(STSVnodeBlockInfoEx)); + if (pTSBuf->pData == NULL) { + tsBufDestory(pTSBuf); + return NULL; + } + + pTSBuf->tsData.rawBuf = malloc(MEM_BUF_SIZE); + if (pTSBuf->tsData.rawBuf == NULL) { + tsBufDestory(pTSBuf); + return NULL; + } + + pTSBuf->bufSize = MEM_BUF_SIZE; + pTSBuf->tsData.threshold = MEM_BUF_SIZE; + pTSBuf->tsData.allocSize = MEM_BUF_SIZE; + + pTSBuf->assistBuf = malloc(MEM_BUF_SIZE); + if (pTSBuf->assistBuf == NULL) { + tsBufDestory(pTSBuf); + return NULL; + } + + pTSBuf->block.payload = malloc(MEM_BUF_SIZE); + if (pTSBuf->block.payload == NULL) { + tsBufDestory(pTSBuf); + return NULL; + } + + pTSBuf->fileSize += getDataStartOffset(); + return pTSBuf; +} \ No newline at end of file diff --git a/src/query/inc/tcache.h b/src/util/inc/tcache.h similarity index 100% rename from src/query/inc/tcache.h rename to src/util/inc/tcache.h diff --git a/src/util/inc/tutil.h b/src/util/inc/tutil.h index cb311ed19361bad442e312732d8e499893e1556e..df97dde5ac7b1440b2cdf2abab9f0243f84ddee5 100644 --- a/src/util/inc/tutil.h +++ b/src/util/inc/tutil.h @@ -137,6 +137,13 @@ int64_t str2int64(char *str); int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath); +/** + * + * @param fileNamePattern + * @param dstPath + */ +void getTmpfilePath(const char *fileNamePattern, char *dstPath); + int32_t taosInitTimer(void (*callback)(int), int32_t ms); bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len); diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 28af28e507c167d1ef2087ae3fae0e6dd9511607..9cad14e8c707802768a84c48a236cfc76b0c3a1e 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -520,6 +520,7 @@ SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) { } pIter->pHashObj = pHashObj; + return pIter; } static SHashNode *getNextHashNode(SHashMutableIterator *pIter) { @@ -600,6 +601,7 @@ void *taosHashDestroyIter(SHashMutableIterator *iter) { } free(iter); + return NULL; } // for profile only diff --git a/src/client/src/tcache.c b/src/util/src/tcache.c similarity index 100% rename from src/client/src/tcache.c rename to src/util/src/tcache.c diff --git a/src/util/src/tutil.c b/src/util/src/tutil.c index d5e67adff693b427633908b39e862e1993812c5f..9c384b25bace047cdd80fe2903c330dbe3ed85d7 100644 --- a/src/util/src/tutil.c +++ b/src/util/src/tutil.c @@ -27,6 +27,8 @@ #include "tlog.h" #include "taoserror.h" +int32_t tmpFileSerialNum = 0; + int32_t strdequote(char *z) { if (z == NULL) { return 0; @@ -401,6 +403,27 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP return rename(fullPath, *dstPath); } +void getTmpfilePath(const char *fileNamePrefix, char *dstPath) { + const char* tdengineTmpFileNamePrefix = "tdengine-"; + + char tmpPath[PATH_MAX] = {0}; + +#ifdef WINDOWS + char *tmpDir = getenv("tmp"); + if (tmpDir == NULL) { + tmpDir = ""; + } +#else + char *tmpDir = "/tmp/"; +#endif + + strcpy(tmpPath, tmpDir); + strcat(tmpPath, tdengineTmpFileNamePrefix); + strcat(tmpPath, fileNamePrefix); + strcat(tmpPath, "-%llu-%u"); + snprintf(dstPath, PATH_MAX, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1)); +} + int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) { #if defined WINDOWS for (int i = 0; i < bytes; ++i) { diff --git a/src/vnode/detail/inc/vnodeRead.h b/src/vnode/detail/inc/vnodeRead.h index 2758cfe1d9610257c7ddf0658874a7ee57511fc0..232a92608c37b54fdbfe0291facca818afcbbda8 100644 --- a/src/vnode/detail/inc/vnodeRead.h +++ b/src/vnode/detail/inc/vnodeRead.h @@ -21,9 +21,9 @@ extern "C" { #endif #include "os.h" -#include "tresultBuf.h" +#include "qresultBuf.h" -#include "tinterpolation.h" +#include "qinterpolation.h" #include "vnodeTagMgmt.h" /* diff --git a/src/vnode/detail/inc/vnodeSupertableQuery.h b/src/vnode/detail/inc/vnodeSupertableQuery.h index bc7fa1e81b03d65c34dda30ea35d352867eaec75..cc2d21871c63510fb45bdf54db15944da593e349 100644 --- a/src/vnode/detail/inc/vnodeSupertableQuery.h +++ b/src/vnode/detail/inc/vnodeSupertableQuery.h @@ -16,11 +16,9 @@ #ifndef TBASE_MNODE_SUPER_TABLE_QUERY_H #define TBASE_MNODE_SUPER_TABLE_QUERY_H -#include -#include -#include +#include "os.h" #include "mnode.h" -#include "tast.h" +#include "qast.h" int32_t mgmtDoJoin(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes); void mgmtReorganizeMetersInMetricMeta(SSuperTableMetaMsg* pInfo, int32_t index, tQueryResultset* pRes); diff --git a/src/vnode/detail/src/vnodeQueryImpl.c b/src/vnode/detail/src/vnodeQueryImpl.c index 2cdcc9130010c12d10553a6089f7ae441d7e94e1..80e073567261e6572bf786646510effac54099d2 100644 --- a/src/vnode/detail/src/vnodeQueryImpl.c +++ b/src/vnode/detail/src/vnodeQueryImpl.c @@ -16,11 +16,11 @@ #include "hash.h" #include "hashfunc.h" #include "os.h" +#include "qextbuffer.h" #include "taosmsg.h" -#include "textbuffer.h" #include "ttime.h" -#include "tinterpolation.h" +#include "qinterpolation.h" #include "tscJoinProcess.h" #include "tscSecondaryMerge.h" #include "tscompression.h" diff --git a/src/vnode/detail/src/vnodeQueryProcess.c b/src/vnode/detail/src/vnodeQueryProcess.c index ae51365918b142e392dcffa27a6b071543f3d02e..cedb76b4accda46e934694cdbb6de416dcc8f75b 100644 --- a/src/vnode/detail/src/vnodeQueryProcess.c +++ b/src/vnode/detail/src/vnodeQueryProcess.c @@ -16,8 +16,8 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "qextbuffer.h" #include "taosmsg.h" -#include "textbuffer.h" #include "tscJoinProcess.h" #include "ttime.h" #include "vnode.h" diff --git a/src/vnode/detail/src/vnodeRead.c b/src/vnode/detail/src/vnodeRead.c index f21294a68e0928b55ebd7fcff28f1eb818af171e..86f508dd9180d5b0d512c3ce509e7cffb7254ca2 100644 --- a/src/vnode/detail/src/vnodeRead.c +++ b/src/vnode/detail/src/vnodeRead.c @@ -19,9 +19,9 @@ #include "hash.h" #include "hashfunc.h" #include "ihash.h" +#include "qast.h" +#include "qextbuffer.h" #include "taosmsg.h" -#include "tast.h" -#include "textbuffer.h" #include "tscJoinProcess.h" #include "tscompression.h" #include "vnode.h" diff --git a/src/vnode/detail/src/vnodeSupertableQuery.c b/src/vnode/detail/src/vnodeSupertableQuery.c index 038577bd8d02bc490d283b286b1f4f3d6fb2749b..36cb7ad74125cc677b48951e3dea7a203da57dc5 100644 --- a/src/vnode/detail/src/vnodeSupertableQuery.c +++ b/src/vnode/detail/src/vnodeSupertableQuery.c @@ -14,13 +14,12 @@ */ #define _DEFAULT_SOURCE -#include "os.h" #include "mnode.h" -#include "textbuffer.h" +#include "os.h" +#include "qast.h" +#include "qextbuffer.h" #include "tschemautil.h" #include "tsqlfunction.h" -#include "tast.h" -//#include "vnodeTagMgmt.h" typedef struct SSyntaxTreeFilterSupporter { SSchema* pTagSchema; diff --git a/src/vnode/detail/src/vnodeTagMgmt.c b/src/vnode/detail/src/vnodeTagMgmt.c index d3e22ec00adfbf418264888ea01a4940723df2a6..054a18900c050d700663903d1314c52bc21eb5a7 100644 --- a/src/vnode/detail/src/vnodeTagMgmt.c +++ b/src/vnode/detail/src/vnodeTagMgmt.c @@ -16,12 +16,12 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "qast.h" +#include "qextbuffer.h" #include "taosdef.h" +#include "taosmsg.h" #include "tlog.h" #include "tutil.h" -#include "taosmsg.h" -#include "textbuffer.h" -#include "tast.h" #include "vnodeTagMgmt.h" #define GET_TAG_VAL_POINTER(s, col, sc, t) ((t *)(&((s)->tags[getColumnModelOffset(sc, col)]))) diff --git a/src/vnode/detail/src/vnodeUtil.c b/src/vnode/detail/src/vnodeUtil.c index feef9ed47324c8c32b64f7c5fb156e5a7c52bf21..43c24bae6af38676a7c9a4a3125f717dfc6d050c 100644 --- a/src/vnode/detail/src/vnodeUtil.c +++ b/src/vnode/detail/src/vnodeUtil.c @@ -16,13 +16,13 @@ #define _DEFAULT_SOURCE #include "os.h" -#include "tast.h" +#include "qast.h" #include "tscUtil.h" #include "tschemautil.h" #include "vnode.h" #include "vnodeDataFilterFunc.h" -#include "vnodeUtil.h" #include "vnodeStatus.h" +#include "vnodeUtil.h" int vnodeCheckFileIntegrity(FILE* fp) { /*