提交 5085e458 编写于 作者: S slguan

Merge branch '2.0' into refact/slguan

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSCCACHE_H
#define TDENGINE_TSCCACHE_H
#ifdef __cplusplus
extern "C" {
#endif
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer);
void taosCloseConnCache(void *handle);
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user);
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TSCACHE_H
......@@ -24,7 +24,6 @@ extern "C" {
#include "tsclient.h"
void tscFetchDatablockFromSubquery(SSqlObj* pSql);
void tscGetQualifiedTSList(SSqlObj* pSql, SJoinSubquerySupporter* p1, SJoinSubquerySupporter* p2, int32_t* num);
void tscSetupOutputColumnIndex(SSqlObj* pSql);
int32_t tscLaunchSecondPhaseSubqueries(SSqlObj* pSql);
......@@ -33,121 +32,6 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code);
SJoinSubquerySupporter* tscCreateJoinSupporter(SSqlObj* pSql, SSubqueryState* pState, int32_t index);
void tscDestroyJoinSupporter(SJoinSubquerySupporter* pSupporter);
#define MEM_BUF_SIZE (1<<20)
#define TS_COMP_BLOCK_PADDING 0xFFFFFFFF
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512
typedef struct STSList {
char* rawBuf;
int32_t allocSize;
int32_t threshold;
int32_t len;
} STSList;
typedef struct STSRawBlock {
int32_t vnode;
int64_t tag;
TSKEY* ts;
int32_t len;
} STSRawBlock;
typedef struct STSElem {
TSKEY ts;
int64_t tag;
int32_t vnode;
} STSElem;
typedef struct STSCursor {
int32_t vnodeIndex;
int32_t blockIndex;
int32_t tsIndex;
int32_t order;
} STSCursor;
typedef struct STSBlock {
int64_t tag; // tag value
int32_t numOfElem; // number of elements
int32_t compLen; // size after compressed
int32_t padding; // 0xFFFFFFFF by default, after the payload
char* payload; // actual data that is compressed
} STSBlock;
typedef struct STSVnodeBlockInfo {
int32_t vnode;
/*
* The size of buffer file is not expected to be greater than 2G,
* and the offset of int32_t type is enough
*/
int32_t offset;
int32_t numOfBlocks;
int32_t compLen;
} STSVnodeBlockInfo;
typedef struct STSVnodeBlockInfoEx {
STSVnodeBlockInfo info;
int32_t len; // length before compress
} STSVnodeBlockInfoEx;
typedef struct STSBuf {
FILE* f;
char path[PATH_MAX];
uint32_t fileSize;
STSVnodeBlockInfoEx* pData;
int32_t numOfAlloc;
int32_t numOfVnodes;
char* assistBuf;
int32_t bufSize;
STSBlock block;
STSList tsData; // uncompressed raw ts data
uint64_t numOfTotal;
bool autoDelete;
int32_t tsOrder; // order of timestamp in ts comp buffer
STSCursor cur;
} STSBuf;
typedef struct STSBufFileHeader {
uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file
uint32_t tsOrder; // timestamp order in current file
} STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
STSBuf* tsBufClone(STSBuf* pTSBuf);
/**
* display all data in comp block file, for debug purpose only
* @param pTSBuf
*/
void tsBufDisplay(STSBuf* pTSBuf);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSQL_H
#define TDENGINE_TSQL_H
#ifdef __cplusplus
extern "C" {
#endif
#include "taos.h"
#include "taosmsg.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tvariant.h"
#include "qsqlparser.h"
enum {
TSQL_NODE_TYPE_EXPR = 0x1,
TSQL_NODE_TYPE_ID = 0x2,
TSQL_NODE_TYPE_VALUE = 0x4,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql);
#ifdef __cplusplus
}
#endif
#endif
......@@ -20,9 +20,9 @@
extern "C" {
#endif
#include "qextbuffer.h"
#include "qinterpolation.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tinterpolation.h"
#include "tlosertree.h"
#include "tsclient.h"
......
......@@ -24,10 +24,10 @@ extern "C" {
* @date 2018/09/30
*/
#include "os.h"
#include "textbuffer.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "tscSecondaryMerge.h"
#include "tsclient.h"
#include "taosdef.h"
#define UTIL_METER_IS_SUPERTABLE(metaInfo) \
(((metaInfo)->pMeterMeta != NULL) && ((metaInfo)->pMeterMeta->tableType == TSDB_SUPER_TABLE))
......@@ -252,6 +252,7 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)());
int32_t launchMultivnodeInsert(SSqlObj *pSql);
#ifdef __cplusplus
}
......
......@@ -25,13 +25,13 @@ extern "C" {
#include "taosmsg.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tscCache.h"
#include "tscSQLParser.h"
#include "taosdef.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "trpc.h"
#include "qsqltype.h"
#include "qsqlparser.h"
#include "qtsbuf.h"
#define TSC_GET_RESPTR_BASE(res, _queryinfo, col) (res->data + ((_queryinfo)->fieldsInfo.pSqlExpr[col]->offset) * res->numOfRows)
......@@ -308,14 +308,14 @@ typedef struct _tsc_obj {
char sversion[TSDB_VERSION_LEN];
char writeAuth : 1;
char superAuth : 1;
struct _sql_obj *pSql;
struct _sql_obj *pHb;
struct _sql_obj *sqlList;
struct SSqlObj *pSql;
struct SSqlObj *pHb;
struct SSqlObj *sqlList;
struct _sstream *streamList;
pthread_mutex_t mutex;
} STscObj;
typedef struct _sql_obj {
typedef struct SSqlObj {
void * signature;
STscObj *pTscObj;
void (*fp)();
......@@ -340,8 +340,8 @@ typedef struct _sql_obj {
uint8_t numOfSubs;
char * asyncTblPos;
void * pTableHashList;
struct _sql_obj **pSubs;
struct _sql_obj * prev, *next;
struct SSqlObj **pSubs;
struct SSqlObj * prev, *next;
} SSqlObj;
typedef struct _sstream {
......@@ -442,10 +442,8 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
// transfer SSqlInfo to SqlCmd struct
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
void tscQueueAsyncFreeResult(SSqlObj *pSql);
int32_t tscToSQLCmd(SSqlObj* pSql, struct SSqlInfo* pInfo);
extern void * pVnodeConn;
extern void * pTscMgmtConn;
......@@ -453,7 +451,6 @@ extern void * tscCacheHandle;
extern int32_t globalCode;
extern int slaveIndex;
extern void * tscTmr;
extern void * tscConnCache;
extern void * tscQhandle;
extern int tscKeepConn[];
extern int tsInsertHeadSize;
......
......@@ -22,7 +22,6 @@
#include "tscUtil.h"
#include "tsclient.h"
#include "tsocket.h"
#include "tscSQLParser.h"
#include "tutil.h"
#include "tnote.h"
#include "tsched.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tglobalcfg.h"
#include "tlog.h"
#include "tmempool.h"
#include "tsclient.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
typedef struct _c_hash_t {
uint32_t ip;
uint16_t port;
struct _c_hash_t *prev;
struct _c_hash_t *next;
void * data;
uint64_t time;
} SConnHash;
typedef struct {
SConnHash ** connHashList;
mpool_h connHashMemPool;
int maxSessions;
int total;
int * count;
int64_t keepTimer;
pthread_mutex_t mutex;
void (*cleanFp)(void *);
void *tmrCtrl;
void *pTimer;
} SConnCache;
int taosHashConn(void *handle, uint32_t ip, uint16_t port, char *user) {
SConnCache *pObj = (SConnCache *)handle;
int hash = 0;
// size_t user_len = strlen(user);
hash = ip >> 16;
hash += (unsigned short)(ip & 0xFFFF);
hash += port;
while (*user != '\0') {
hash += *user;
user++;
}
hash = hash % pObj->maxSessions;
return hash;
}
void taosRemoveExpiredNodes(SConnCache *pObj, SConnHash *pNode, int hash, uint64_t time) {
if (pNode == NULL) return;
if (time < pObj->keepTimer + pNode->time) return;
SConnHash *pPrev = pNode->prev, *pNext;
while (pNode) {
(*pObj->cleanFp)(pNode->data);
pNext = pNode->next;
pObj->total--;
pObj->count[hash]--;
tscTrace("%p ip:0x%x:%hu:%d:%p removed, connections in cache:%d", pNode->data, pNode->ip, pNode->port, hash, pNode,
pObj->count[hash]);
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pNode = pNext;
}
if (pPrev)
pPrev->next = NULL;
else
pObj->connHashList[hash] = NULL;
}
void *taosAddConnIntoCache(void *handle, void *data, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
uint64_t time = taosGetTimestampMs();
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
if (data == NULL) {
tscTrace("data:%p ip:%p:%d not valid, not added in cache", data, ip, port);
return NULL;
}
hash = taosHashConn(pObj, ip, port, user);
pNode = (SConnHash *)taosMemPoolMalloc(pObj->connHashMemPool);
pNode->ip = ip;
pNode->port = port;
pNode->data = data;
pNode->prev = NULL;
pNode->time = time;
pthread_mutex_lock(&pObj->mutex);
pNode->next = pObj->connHashList[hash];
if (pObj->connHashList[hash] != NULL) (pObj->connHashList[hash])->prev = pNode;
pObj->connHashList[hash] = pNode;
pObj->total++;
pObj->count[hash]++;
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
pthread_mutex_unlock(&pObj->mutex);
tscTrace("%p ip:0x%x:%hu:%d:%p added, connections in cache:%d", data, ip, port, hash, pNode, pObj->count[hash]);
return pObj;
}
void taosCleanConnCache(void *handle, void *tmrId) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
if (pObj->pTimer != tmrId) return;
uint64_t time = taosGetTimestampMs();
for (hash = 0; hash < pObj->maxSessions; ++hash) {
pthread_mutex_lock(&pObj->mutex);
pNode = pObj->connHashList[hash];
taosRemoveExpiredNodes(pObj, pNode, hash, time);
pthread_mutex_unlock(&pObj->mutex);
}
// tscTrace("timer, total connections in cache:%d", pObj->total);
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
}
void *taosGetConnFromCache(void *handle, uint32_t ip, uint16_t port, char *user) {
int hash;
SConnHash * pNode;
SConnCache *pObj;
void * pData = NULL;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
uint64_t time = taosGetTimestampMs();
hash = taosHashConn(pObj, ip, port, user);
pthread_mutex_lock(&pObj->mutex);
pNode = pObj->connHashList[hash];
while (pNode) {
if (time >= pObj->keepTimer + pNode->time) {
taosRemoveExpiredNodes(pObj, pNode, hash, time);
pNode = NULL;
break;
}
if (pNode->ip == ip && pNode->port == port) break;
pNode = pNode->next;
}
if (pNode) {
taosRemoveExpiredNodes(pObj, pNode->next, hash, time);
if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
pObj->connHashList[hash] = pNode->next;
}
if (pNode->next) {
pNode->next->prev = pNode->prev;
}
pData = pNode->data;
taosMemPoolFree(pObj->connHashMemPool, (char *)pNode);
pObj->total--;
pObj->count[hash]--;
}
pthread_mutex_unlock(&pObj->mutex);
if (pData) {
tscTrace("%p ip:0x%x:%hu:%d:%p retrieved, connections in cache:%d", pData, ip, port, hash, pNode, pObj->count[hash]);
}
return pData;
}
void *taosOpenConnCache(int maxSessions, void (*cleanFp)(void *), void *tmrCtrl, int64_t keepTimer) {
SConnHash **connHashList;
mpool_h connHashMemPool;
SConnCache *pObj;
connHashMemPool = taosMemPoolInit(maxSessions, sizeof(SConnHash));
if (connHashMemPool == 0) return NULL;
connHashList = calloc(sizeof(SConnHash *), maxSessions);
if (connHashList == 0) {
taosMemPoolCleanUp(connHashMemPool);
return NULL;
}
pObj = malloc(sizeof(SConnCache));
if (pObj == NULL) {
taosMemPoolCleanUp(connHashMemPool);
free(connHashList);
return NULL;
}
memset(pObj, 0, sizeof(SConnCache));
pObj->count = calloc(sizeof(int), maxSessions);
pObj->total = 0;
pObj->keepTimer = keepTimer;
pObj->maxSessions = maxSessions;
pObj->connHashMemPool = connHashMemPool;
pObj->connHashList = connHashList;
pObj->cleanFp = cleanFp;
pObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosCleanConnCache, pObj->keepTimer * 2, pObj, pObj->tmrCtrl, &pObj->pTimer);
pthread_mutex_init(&pObj->mutex, NULL);
return pObj;
}
void taosCloseConnCache(void *handle) {
SConnCache *pObj;
pObj = (SConnCache *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return;
pthread_mutex_lock(&pObj->mutex);
taosTmrStopA(&(pObj->pTimer));
if (pObj->connHashMemPool) taosMemPoolCleanUp(pObj->connHashMemPool);
tfree(pObj->connHashList);
tfree(pObj->count)
pthread_mutex_unlock(&pObj->mutex);
pthread_mutex_destroy(&pObj->mutex);
memset(pObj, 0, sizeof(SConnCache));
free(pObj);
}
......@@ -14,20 +14,21 @@
*/
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "qhistogram.h"
#include "qinterpolation.h"
#include "qpercentile.h"
#include "qsyntaxtreefunction.h"
#include "qtsbuf.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "thistogram.h"
#include "tinterpolation.h"
#include "tlog.h"
#include "tscJoinProcess.h"
#include "tscSyntaxtreefunction.h"
#include "tscompression.h"
#include "tsqlfunction.h"
#include "ttime.h"
#include "taosdef.h"
#include "tutil.h"
#include "tpercentile.h"
#define GET_INPUT_CHAR(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_CHAR_INDEX(x, y) (GET_INPUT_CHAR(x) + (y) * (x)->inputBytes)
......
此差异已折叠。
......@@ -21,7 +21,7 @@
#include "tsclient.h"
#include "taosdef.h"
#include "textbuffer.h"
#include "qextbuffer.h"
#include "tscSecondaryMerge.h"
#include "tschemautil.h"
#include "tsocket.h"
......
......@@ -1314,11 +1314,10 @@ int tsParseSql(SSqlObj *pSql, bool multiVnodeInsertion) {
* the error handle callback function can rightfully restore the user defined function (fp)
*/
if (pSql->fp != NULL && multiVnodeInsertion) {
assert(pSql->fetchFp == NULL);
pSql->fetchFp = pSql->fp;
// replace user defined callback function with multi-insert proxy function
pSql->fp = tscAsyncInsertMultiVnodesProxy;
pSql->fp = (void(*)())launchMultivnodeInsert;
}
ret = tsParseInsertSql(pSql);
......
......@@ -15,7 +15,6 @@
#include "taos.h"
#include "tsclient.h"
#include "tscSQLParser.h"
#include "tscUtil.h"
#include "ttimer.h"
#include "taosmsg.h"
......
......@@ -17,17 +17,17 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qast.h"
#include "taos.h"
#include "taosmsg.h"
#include "tstoken.h"
#include "tstrbuild.h"
#include "ttime.h"
#include "tast.h"
#include "tscSQLParser.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "tsclient.h"
#include "ttokendef.h"
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
......@@ -59,7 +59,7 @@ static int32_t setObjFullName(char* fullName, const char* account, SSQLToken* pD
static void getColumnName(tSQLExprItem* pItem, char* resultFieldName, int32_t nameLength);
static void getRevisedName(char* resultFieldName, int32_t functionId, int32_t maxLen, char* columnName);
static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool isResultColumn);
static int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIdx, tSQLExprItem* pItem, bool finalResult);
static int32_t insertResultField(SQueryInfo* pQueryInfo, int32_t outputIndex, SColumnList* pIdList, int16_t bytes,
int8_t type, char* fieldName, SSqlExpr* pSqlExpr);
static int32_t changeFunctionID(int32_t optr, int16_t* functionId);
......
......@@ -18,7 +18,6 @@
#include "trpc.h"
#include "tscJoinProcess.h"
#include "tscProfile.h"
#include "tscSQLParser.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tschemautil.h"
......@@ -605,7 +604,7 @@ int tscProcessSql(SSqlObj *pSql) {
}
}
}
if (tscIsTwoStageMergeMetricQuery(pQueryInfo, 0)) {
/*
* (ref. line: 964)
......@@ -615,24 +614,16 @@ int tscProcessSql(SSqlObj *pSql) {
* when pSql being released, pSql->fp == NULL, it may pass the check of pSql->fp == NULL,
* which causes deadlock. So we keep it as local variable.
*/
void *fp = pSql->fp;
if (tscLaunchSTableSubqueries(pSql) != TSDB_CODE_SUCCESS) {
return pRes->code;
}
if (fp == NULL) {
tsem_post(&pSql->emptyRspSem);
tsem_wait(&pSql->rspSem);
tsem_post(&pSql->emptyRspSem);
// set the command flag must be after the semaphore been correctly set.
pSql->cmd.command = TSDB_SQL_RETRIEVE_METRIC;
}
return pSql->res.code;
} else if (pSql->fp == (void(*)())launchMultivnodeInsert) { // multi-vnodes insertion
launchMultivnodeInsert(pSql);
return pSql->res.code;
}
return doProcessSql(pSql);
}
......
......@@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tast.h>
#include "hash.h"
#include "os.h"
#include "tcache.h"
......@@ -22,7 +21,6 @@
#include "trpc.h"
#include "tscJoinProcess.h"
#include "tscProfile.h"
#include "tscSQLParser.h"
#include "tscSecondaryMerge.h"
#include "tscUtil.h"
#include "tsclient.h"
......@@ -30,6 +28,8 @@
#include "tsocket.h"
#include "ttimer.h"
#include "tutil.h"
#include "ttokendef.h"
#include "qast.h"
TAOS *taos_connect_imp(const char *ip, const char *user, const char *pass, const char *db, uint16_t port,
void (*fp)(void *, TAOS_RES *, int), void *param, void **taos) {
......
......@@ -15,7 +15,6 @@
#include "os.h"
#include "tlog.h"
#include "tscSQLParser.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
......
......@@ -38,7 +38,6 @@ int initialized = 0;
int slaveIndex;
void * tscTmr;
void * tscQhandle;
void * tscConnCache;
void * tscCheckDiskUsageTmr;
int tsInsertHeadSize;
......@@ -188,8 +187,6 @@ void taos_init_imp() {
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
initialized = 1;
tscTrace("client is initialized successfully");
tsInsertHeadSize = tsRpcHeadSize + sizeof(SShellSubmitMsg);
......
......@@ -16,8 +16,8 @@
#include "tscUtil.h"
#include "hash.h"
#include "os.h"
#include "qast.h"
#include "taosmsg.h"
#include "tast.h"
#include "tcache.h"
#include "tkey.h"
#include "tmd5.h"
......@@ -2105,7 +2105,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
void tscDoQuery(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
void* fp = pSql->fp;
pSql->res.code = TSDB_CODE_SUCCESS;
......@@ -2121,7 +2120,6 @@ void tscDoQuery(SSqlObj* pSql) {
} else {
// pSql may be released in this function if it is a async insertion.
tscProcessSql(pSql);
if (NULL == fp) tscProcessMultiVnodesInsert(pSql);
}
}
}
......@@ -2321,3 +2319,94 @@ void tscTryQueryNextClause(SSqlObj* pSql, void (*queryFp)()) {
tscProcessSql(pSql);
}
}
typedef struct SinsertSupporter {
SSubqueryState* pState;
SSqlObj* pSql;
} SinsertSupporter;
void multiVnodeInsertMerge(void* param, TAOS_RES* tres, int numOfRows) {
SinsertSupporter *pSupporter = (SinsertSupporter *)param;
SSqlObj* pParentObj = pSupporter->pSql;
SSqlCmd* pParentCmd = &pParentObj->cmd;
SSubqueryState* pState = pSupporter->pState;
int32_t total = pState->numOfTotal;
// increase the total inserted rows
if (numOfRows > 0) {
pParentObj->res.numOfRows += numOfRows;
}
int32_t completed = atomic_add_fetch_32(&pState->numOfCompleted, 1);
if (completed < total) {
return;
}
tscTrace("%p Async insertion completed, total inserted:%d", pParentObj, pParentObj->res.numOfRows);
// release data block data
pParentCmd->pDataBlocks = tscDestroyBlockArrayList(pParentCmd->pDataBlocks);
// restore user defined fp
pParentObj->fp = pParentObj->fetchFp;
// all data has been sent to vnode, call user function
(*pParentObj->fp)(pParentObj->param, tres, numOfRows);
}
int32_t launchMultivnodeInsert(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->qhandle = 1; // hack the qhandle check
SDataBlockList *pDataBlocks = pCmd->pDataBlocks;
pSql->pSubs = calloc(pDataBlocks->nSize, POINTER_BYTES);
pSql->numOfSubs = pDataBlocks->nSize;
assert(pDataBlocks->nSize > 0);
tscTrace("%p submit data to %d vnode(s)", pSql, pDataBlocks->nSize);
SSubqueryState *pState = calloc(1, sizeof(SSubqueryState));
pState->numOfTotal = pSql->numOfSubs;
pRes->code = TSDB_CODE_SUCCESS;
int32_t i = 0;
for (; i < pSql->numOfSubs; ++i) {
SinsertSupporter* pSupporter = calloc(1, sizeof(SinsertSupporter));
pSupporter->pSql = pSql;
pSupporter->pState = pState;
SSqlObj *pNew = createSubqueryObj(pSql, 0, multiVnodeInsertMerge, pSupporter, NULL);
if (pNew == NULL) {
tscError("%p failed to malloc buffer for subObj, orderOfSub:%d, reason:%s", pSql, i, strerror(errno));
break;
}
pSql->pSubs[i] = pNew;
tscTrace("%p sub:%p create subObj success. orderOfSub:%d", pSql, pNew, i);
}
if (i < pSql->numOfSubs) {
tscError("%p failed to prepare subObj structure and launch sub-insertion", pSql);
pRes->code = TSDB_CODE_CLI_OUT_OF_MEMORY;
return pRes->code; // free all allocated resource
}
for (int32_t j = 0; j < pSql->numOfSubs; ++j) {
SSqlObj *pSub = pSql->pSubs[j];
pSub->cmd.command = TSDB_SQL_INSERT;
int32_t code = tscCopyDataBlockToPayload(pSub, pDataBlocks->pData[j]);
if (code != TSDB_CODE_SUCCESS) {
tscTrace("%p prepare submit data block failed in async insertion, vnodeIdx:%d, total:%d, code:%d", pSql, j,
pDataBlocks->nSize, code);
}
tscTrace("%p sub:%p launch sub insert, orderOfSub:%d", pSql, pSub, j);
tscProcessSql(pSub);
}
return TSDB_CODE_SUCCESS;
}
......@@ -16,7 +16,7 @@
#ifndef TDENGINE_TPERCENTILE_H
#define TDENGINE_TPERCENTILE_H
#include "textbuffer.h"
#include "qextbuffer.h"
typedef struct MinMaxEntry {
union {
......
......@@ -14,22 +14,11 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "tschemautil.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstatus.h"
#include "ttime.h"
#include "mnode.h"
#include "mgmtTable.h"
#include "mgmtAcct.h"
#include "mgmtChildTable.h"
#include "mgmtDb.h"
#include "mgmtDClient.h"
#include "mgmtDb.h"
#include "mgmtDnode.h"
#include "mgmtGrant.h"
#include "mgmtMnode.h"
......@@ -37,9 +26,20 @@
#include "mgmtProfile.h"
#include "mgmtShell.h"
#include "mgmtSuperTable.h"
#include "mgmtTable.h"
#include "mgmtUser.h"
#include "mgmtVgroup.h"
#include "mnode.h"
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "taoserror.h"
#include "taosmsg.h"
#include "tschemautil.h"
#include "tscompression.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "tstatus.h"
#include "ttime.h"
extern void *tsNormalTableSdb;
extern void *tsChildTableSdb;
......
......@@ -131,13 +131,6 @@ typedef struct tSidSet {
SColumnOrderInfo orderIdx;
} tSidSet;
/**
*
* @param fileNamePattern
* @param dstPath
*/
void getTmpfilePath(const char *fileNamePattern, char *dstPath);
/**
*
* @param inMemSize
......
......@@ -20,6 +20,10 @@
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#include "qextbuffer.h"
typedef struct SInterpolationInfo {
int64_t startTimestamp;
int32_t order; // order [asc/desc]
......
......@@ -21,7 +21,7 @@ extern "C" {
#endif
#include "os.h"
#include "textbuffer.h"
#include "qextbuffer.h"
typedef struct SIDList {
uint32_t alloc;
......
......@@ -329,6 +329,18 @@ void tSQLSetColumnType(TAOS_FIELD *pField, SSQLToken *pToken);
void *ParseAlloc(void *(*mallocProc)(size_t));
enum {
TSQL_NODE_TYPE_EXPR = 0x1,
TSQL_NODE_TYPE_ID = 0x2,
TSQL_NODE_TYPE_VALUE = 0x4,
};
#define NON_ARITHMEIC_EXPR 0
#define NORMAL_ARITHMETIC 1
#define AGG_ARIGHTMEIC 2
int32_t tSQLParse(SSqlInfo *pSQLInfo, const char *pSql);
#ifdef __cplusplus
}
#endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_STSBUF_H
#define TDENGINE_STSBUF_H
#ifdef __cplusplus
extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#define MEM_BUF_SIZE (1 << 20)
#define TS_COMP_FILE_MAGIC 0x87F5EC4C
#define TS_COMP_FILE_VNODE_MAX 512
typedef struct STSList {
char* rawBuf;
int32_t allocSize;
int32_t threshold;
int32_t len;
} STSList;
typedef struct STSRawBlock {
int32_t vnode;
int64_t tag;
TSKEY* ts;
int32_t len;
} STSRawBlock;
typedef struct STSElem {
TSKEY ts;
int64_t tag;
int32_t vnode;
} STSElem;
typedef struct STSCursor {
int32_t vnodeIndex;
int32_t blockIndex;
int32_t tsIndex;
int32_t order;
} STSCursor;
typedef struct STSBlock {
int64_t tag; // tag value
int32_t numOfElem; // number of elements
int32_t compLen; // size after compressed
int32_t padding; // 0xFFFFFFFF by default, after the payload
char* payload; // actual data that is compressed
} STSBlock;
/*
* The size of buffer file should not be greater than 2G,
* and the offset of int32_t type is enough
*/
typedef struct STSVnodeBlockInfo {
int32_t vnode; // vnode id
int32_t offset; // offset set value in file
int32_t numOfBlocks; // number of total blocks
int32_t compLen; // compressed size
} STSVnodeBlockInfo;
typedef struct STSVnodeBlockInfoEx {
STSVnodeBlockInfo info;
int32_t len; // length before compress
} STSVnodeBlockInfoEx;
typedef struct STSBuf {
FILE* f;
char path[PATH_MAX];
uint32_t fileSize;
STSVnodeBlockInfoEx* pData;
int32_t numOfAlloc;
int32_t numOfVnodes;
char* assistBuf;
int32_t bufSize;
STSBlock block;
STSList tsData; // uncompressed raw ts data
uint64_t numOfTotal;
bool autoDelete;
int32_t tsOrder; // order of timestamp in ts comp buffer
STSCursor cur;
} STSBuf;
typedef struct STSBufFileHeader {
uint32_t magic; // file magic number
uint32_t numOfVnode; // number of vnode stored in current file
uint32_t tsOrder; // timestamp order in current file
} STSBufFileHeader;
STSBuf* tsBufCreate(bool autoDelete);
STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete);
STSBuf* tsBufCreateFromCompBlocks(const char* pData, int32_t numOfBlocks, int32_t len, int32_t tsOrder);
void* tsBufDestory(STSBuf* pTSBuf);
void tsBufAppend(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag, const char* pData, int32_t len);
int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf, int32_t vnodeIdx);
STSBuf* tsBufClone(STSBuf* pTSBuf);
STSVnodeBlockInfo* tsBufGetVnodeBlockInfo(STSBuf* pTSBuf, int32_t vnodeId);
void tsBufFlush(STSBuf* pTSBuf);
void tsBufResetPos(STSBuf* pTSBuf);
STSElem tsBufGetElem(STSBuf* pTSBuf);
bool tsBufNextPos(STSBuf* pTSBuf);
STSElem tsBufGetElemStartPos(STSBuf* pTSBuf, int32_t vnodeId, int64_t tag);
STSCursor tsBufGetCursor(STSBuf* pTSBuf);
void tsBufSetTraverseOrder(STSBuf* pTSBuf, int32_t order);
void tsBufSetCursor(STSBuf* pTSBuf, STSCursor* pCur);
/**
* display all data in comp block file, for debug purpose only
* @param pTSBuf
*/
void tsBufDisplay(STSBuf* pTSBuf);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_STSBUF_H
......@@ -26,7 +26,6 @@
#include <string.h>
#include <assert.h>
#include <stdbool.h>
#include "tscSQLParser.h"
#include "tutil.h"
}
......
......@@ -13,19 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qast.h"
#include "os.h"
#include "qsqlparser.h"
#include "qsyntaxtreefunction.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tast.h"
#include "tlog.h"
#include "tscSyntaxtreefunction.h"
#include "tschemautil.h"
#include "tsqlfunction.h"
#include "tstoken.h"
#include "ttokendef.h"
#include "taosdef.h"
#include "tutil.h"
#include "qsqlparser.h"
/*
*
......@@ -609,6 +608,7 @@ int32_t merge(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset *
// }
//
// return pFinalRes->num;
return 0;
}
int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResultset *pFinalRes) {
......@@ -643,12 +643,13 @@ int32_t intersect(tQueryResultset *pLeft, tQueryResultset *pRight, tQueryResults
// }
//
// return pFinalRes->num;
return 0;
}
/*
* traverse the result and apply the function to each item to check if the item is qualified or not
*/
static void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) {
static UNUSED_FUNC void tSQLListTraverseOnResult(struct tSQLBinaryExpr *pExpr, __result_filter_fn_t fp, tQueryResultset *pResult) {
assert(pExpr->pLeft->nodeType == TSQL_NODE_COL && pExpr->pRight->nodeType == TSQL_NODE_VALUE);
// brutal force scan the result list and check for each item in the list
......@@ -705,7 +706,7 @@ static bool filterItem(tSQLBinaryExpr *pExpr, const void *pItem, SBinaryFilterSu
* @param pSchema tag schemas
* @param fp filter callback function
*/
static void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) {
static UNUSED_FUNC void tSQLBinaryTraverseOnResult(tSQLBinaryExpr *pExpr, tQueryResultset *pResult, SBinaryFilterSupp *param) {
int32_t n = 0;
for (int32_t i = 0; i < pResult->num; ++i) {
void *pItem = pResult->pRes[i];
......
......@@ -12,42 +12,19 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qextbuffer.h"
#include "os.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tlog.h"
#include "tsqlfunction.h"
#include "ttime.h"
#include "taosdef.h"
#include "tutil.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
int32_t tmpFileSerialNum = 0;
void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[MAX_TMPFILE_PATH_LENGTH] = {0};
#ifdef WINDOWS
char *tmpDir = getenv("tmp");
if (tmpDir == NULL) {
tmpDir = "";
}
#else
char *tmpDir = "/tmp/";
#endif
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%llu-%u");
snprintf(dstPath, MAX_TMPFILE_PATH_LENGTH, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1));
}
/*
* SColumnModel is deeply copy
*/
......
......@@ -14,10 +14,10 @@
*/
#include "os.h"
#include "qhistogram.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "thistogram.h"
#include "tlosertree.h"
#include "taosdef.h"
/**
*
......
......@@ -13,12 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qinterpolation.h"
#include "os.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tinterpolation.h"
#include "tsqlfunction.h"
#include "taosdef.h"
#define INTERPOL_IS_ASC_INTERPOL(interp) ((interp)->order == TSQL_SO_ASC)
......
......@@ -15,11 +15,10 @@
#include "os.h"
#include "taosmsg.h"
#include "qpercentile.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tlog.h"
#include "taosdef.h"
#include "tpercentile.h"
tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) {
tExtMemBuffer *pBuffer = NULL;
......
#include "qresultBuf.h"
#include "hash.h"
#include "qextbuffer.h"
#include "taoserror.h"
#include "textbuffer.h"
#include "tlog.h"
#include "tsqlfunction.h"
#include "tresultBuf.h"
#define DEFAULT_INTERN_BUF_SIZE 16384L
......
......@@ -15,7 +15,7 @@
#include "os.h"
#include "tscSyntaxtreefunction.h"
#include "qsyntaxtreefunction.h"
#include "taosdef.h"
#include "tutil.h"
......
此差异已折叠。
......@@ -137,6 +137,13 @@ int64_t str2int64(char *str);
int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstPath);
/**
*
* @param fileNamePattern
* @param dstPath
*/
void getTmpfilePath(const char *fileNamePattern, char *dstPath);
int32_t taosInitTimer(void (*callback)(int), int32_t ms);
bool taosMbsToUcs4(char *mbs, int32_t mbs_len, char *ucs4, int32_t ucs4_max_len);
......
......@@ -520,6 +520,7 @@ SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) {
}
pIter->pHashObj = pHashObj;
return pIter;
}
static SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
......@@ -600,6 +601,7 @@ void *taosHashDestroyIter(SHashMutableIterator *iter) {
}
free(iter);
return NULL;
}
// for profile only
......
......@@ -27,6 +27,8 @@
#include "tlog.h"
#include "taoserror.h"
int32_t tmpFileSerialNum = 0;
int32_t strdequote(char *z) {
if (z == NULL) {
return 0;
......@@ -401,6 +403,27 @@ int32_t taosFileRename(char *fullPath, char *suffix, char delimiter, char **dstP
return rename(fullPath, *dstPath);
}
void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
const char* tdengineTmpFileNamePrefix = "tdengine-";
char tmpPath[PATH_MAX] = {0};
#ifdef WINDOWS
char *tmpDir = getenv("tmp");
if (tmpDir == NULL) {
tmpDir = "";
}
#else
char *tmpDir = "/tmp/";
#endif
strcpy(tmpPath, tmpDir);
strcat(tmpPath, tdengineTmpFileNamePrefix);
strcat(tmpPath, fileNamePrefix);
strcat(tmpPath, "-%llu-%u");
snprintf(dstPath, PATH_MAX, tmpPath, taosGetPthreadId(), atomic_add_fetch_32(&tmpFileSerialNum, 1));
}
int tasoUcs4Compare(void* f1_ucs4, void *f2_ucs4, int bytes) {
#if defined WINDOWS
for (int i = 0; i < bytes; ++i) {
......
......@@ -21,9 +21,9 @@ extern "C" {
#endif
#include "os.h"
#include "tresultBuf.h"
#include "qresultBuf.h"
#include "tinterpolation.h"
#include "qinterpolation.h"
#include "vnodeTagMgmt.h"
/*
......
......@@ -16,11 +16,9 @@
#ifndef TBASE_MNODE_SUPER_TABLE_QUERY_H
#define TBASE_MNODE_SUPER_TABLE_QUERY_H
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include "os.h"
#include "mnode.h"
#include "tast.h"
#include "qast.h"
int32_t mgmtDoJoin(SSuperTableMetaMsg* pSuperTableMetaMsg, tQueryResultset* pRes);
void mgmtReorganizeMetersInMetricMeta(SSuperTableMetaMsg* pInfo, int32_t index, tQueryResultset* pRes);
......
......@@ -16,11 +16,11 @@
#include "hash.h"
#include "hashfunc.h"
#include "os.h"
#include "qextbuffer.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "ttime.h"
#include "tinterpolation.h"
#include "qinterpolation.h"
#include "tscJoinProcess.h"
#include "tscSecondaryMerge.h"
#include "tscompression.h"
......
......@@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qextbuffer.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tscJoinProcess.h"
#include "ttime.h"
#include "vnode.h"
......
......@@ -19,9 +19,9 @@
#include "hash.h"
#include "hashfunc.h"
#include "ihash.h"
#include "qast.h"
#include "qextbuffer.h"
#include "taosmsg.h"
#include "tast.h"
#include "textbuffer.h"
#include "tscJoinProcess.h"
#include "tscompression.h"
#include "vnode.h"
......
......@@ -14,13 +14,12 @@
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "mnode.h"
#include "textbuffer.h"
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "tschemautil.h"
#include "tsqlfunction.h"
#include "tast.h"
//#include "vnodeTagMgmt.h"
typedef struct SSyntaxTreeFilterSupporter {
SSchema* pTagSchema;
......
......@@ -16,12 +16,12 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "qast.h"
#include "qextbuffer.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tutil.h"
#include "taosmsg.h"
#include "textbuffer.h"
#include "tast.h"
#include "vnodeTagMgmt.h"
#define GET_TAG_VAL_POINTER(s, col, sc, t) ((t *)(&((s)->tags[getColumnModelOffset(sc, col)])))
......
......@@ -16,13 +16,13 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tast.h"
#include "qast.h"
#include "tscUtil.h"
#include "tschemautil.h"
#include "vnode.h"
#include "vnodeDataFilterFunc.h"
#include "vnodeUtil.h"
#include "vnodeStatus.h"
#include "vnodeUtil.h"
int vnodeCheckFileIntegrity(FILE* fp) {
/*
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册