提交 d776f10d 编写于 作者: S Shengliang Guan

TD-1057

......@@ -157,7 +157,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```mysql
DROP TABLE [IF EXISTS] stb_name;
```
删除STable会自动删除通过STable创建的表。
删除STable会自动删除通过STable创建的表。
- **显示当前数据库下的所有超级表信息**
......@@ -206,7 +206,7 @@ TDengine缺省的时间戳是毫秒精度,但通过修改配置参数enableMic
```
修改超级表的标签名,从超级表修改某个标签名后,该超级表下的所有子表也会自动更新该标签名。
- **修改表标签值**
- **修改表标签值**
```mysql
ALTER TABLE tb_name SET TAG tag_name=new_tag_value;
......
......@@ -107,7 +107,7 @@ CREATE DATABASE demo replica 3;
```
一个DB里的数据会被切片分到多个vnode group,vnode group里的vnode数目就是DB的副本数,同一个vnode group里各vnode的数据是完全一致的。为保证高可用性,vnode group里的vnode一定要分布在不同的dnode里(实际部署时,需要在不同的物理机上),只要一个vgroup里超过半数的vnode处于工作状态,这个vgroup就能正常的对外服务。
一个dnode里可能有多个DB的数据,因此一个dnode离线时,可能会影响到多个DB。如果一个vnode group里的一半或一半以上的vnode不工作,那么该vnode group就无法对外服务,无法插入或读取数据,这样会影响到它所属的DB的一部分表的d读写操作。
一个dnode里可能有多个DB的数据,因此一个dnode离线时,可能会影响到多个DB。如果一个vnode group里的一半或一半以上的vnode不工作,那么该vnode group就无法对外服务,无法插入或读取数据,这样会影响到它所属的DB的一部分表的读写操作。
因为vnode的引入,无法简单的给出结论:“集群中过半dnode工作,集群就应该工作”。但是对于简单的情形,很好下结论。比如副本数为3,只有三个dnode,那如果仅有一个节点不工作,整个集群还是可以正常工作的,但如果有两个节点不工作,那整个集群就无法正常工作了。
......
......@@ -277,6 +277,9 @@ void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRo
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
char* strdup_throw(const char* str);
......
......@@ -80,8 +80,9 @@ typedef struct STableMetaInfo {
* 2. keep the vgroup index for multi-vnode insertion
*/
int32_t vgroupIndex;
char name[TSDB_TABLE_ID_LEN]; // (super) table name
SArray* tagColList; // SArray<SColumn*>, involved tag columns
char name[TSDB_TABLE_FNAME_LEN]; // (super) table name
char aliasName[TSDB_TABLE_NAME_LEN]; // alias name of table specified in query sql
SArray* tagColList; // SArray<SColumn*>, involved tag columns
} STableMetaInfo;
/* the structure for sql function in select clause */
......@@ -106,7 +107,7 @@ typedef struct SColumnIndex {
typedef struct SFieldSupInfo {
bool visible;
SExprInfo *pArithExprInfo;
SSqlExpr * pSqlExpr;
SSqlExpr *pSqlExpr;
} SFieldSupInfo;
typedef struct SFieldInfo {
......@@ -128,7 +129,7 @@ typedef struct SCond {
} SCond;
typedef struct SJoinNode {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
uint64_t uid;
int16_t tagColId;
} SJoinNode;
......@@ -162,7 +163,7 @@ typedef struct SParamInfo {
} SParamInfo;
typedef struct STableDataBlocks {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
int8_t tsSource; // where does the UNIX timestamp come from, server or client
bool ordered; // if current rows are ordered or not
int64_t vgId; // virtual group id
......@@ -255,6 +256,7 @@ typedef struct SResRec {
typedef struct {
int64_t numOfRows; // num of results in current retrieved
int64_t numOfRowsGroup; // num of results of current group
int64_t numOfTotal; // num of total results
int64_t numOfClauseTotal; // num of total result in current subclause
char * pRsp;
......@@ -301,6 +303,7 @@ typedef struct STscObj {
typedef struct SSqlObj {
void *signature;
pthread_t owner; // owner of sql object, by which it is executed
STscObj *pTscObj;
void *pRpcCtx;
void (*fp)();
......@@ -419,7 +422,6 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
//void tscGetResultColumnChr(SSqlRes *pRes, SFieldInfo* pFieldInfo, int32_t column);
static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pFieldInfo, int32_t columnIndex) {
SFieldSupInfo* pInfo = (SFieldSupInfo*) TARRAY_GET_ELEM(pFieldInfo->pSupportInfo, columnIndex);
......
......@@ -220,14 +220,13 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockFromSubquery(pSql);
} else if (pRes->completed) {
if(pCmd->command == TSDB_SQL_FETCH) {
if(pCmd->command == TSDB_SQL_FETCH || (pCmd->command >= TSDB_SQL_SERV_STATUS && pCmd->command <= TSDB_SQL_CURRENT_USER)) {
if (hasMoreVnodesToTry(pSql)) { // sequentially retrieve data from remain vnodes.
tscTryQueryNextVnode(pSql, tscAsyncQueryRowsForNextVnode);
return;
} else {
/*
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
* all available virtual nodes in current clause has been checked already, now try the
* next one in the following union subclause
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
......@@ -235,11 +234,12 @@ void taos_fetch_rows_a(TAOS_RES *taosa, void (*fp)(void *, TAOS_RES *, int), voi
}
/*
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
* 1. has reach the limitation
* 2. no remain virtual nodes to be retrieved anymore
*/
(*pSql->fetchFp)(param, pSql, 0);
}
return;
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) {
// in case of show command, return no data
......
......@@ -1942,11 +1942,12 @@ static void do_top_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData,
static void do_bottom_function_add(STopBotInfo *pInfo, int32_t maxLen, void *pData, int64_t ts, uint16_t type,
SExtTagsInfo *pTagInfo, char *pTags, int16_t stage) {
tValuePair **pList = pInfo->res;
tVariant val = {0};
tVariantCreateFromBinary(&val, pData, tDataTypeDesc[type].nSize, type);
tValuePair **pList = pInfo->res;
assert(pList != NULL);
if (pInfo->num < maxLen) {
if (pInfo->num == 0) {
valuePairAssign(pList[pInfo->num], type, (const char*) &val.i64Key, ts, pTags, pTagInfo, stage);
......
......@@ -293,7 +293,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
char db[TSDB_DB_NAME_LEN] = {0};
extractDBName(pSql->pTscObj->db, db);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -314,7 +314,7 @@ static void tscProcessCurrentDB(SSqlObj *pSql) {
static void tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, pSql->cmd.clauseIndex);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......
......@@ -68,7 +68,7 @@ static void tscInitSqlContext(SSqlCmd *pCmd, SLocalReducer *pReducer, tOrderDesc
SSqlExpr * pExpr = tscSqlExprGet(pQueryInfo, i);
pCtx->aOutputBuf =
pReducer->pResultBuf->data + tscFieldInfoGetOffset(pQueryInfo, i) * pReducer->resColModel->capacity;
pReducer->pResultBuf->data + pExpr->offset * pReducer->resColModel->capacity;
pCtx->order = pQueryInfo->order.order;
pCtx->functionId = pExpr->functionId;
......@@ -321,6 +321,7 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
pReducer->finalRowSize = tscGetResRowLength(pQueryInfo->exprList);
pReducer->resColModel = finalmodel;
pReducer->resColModel->capacity = pReducer->nResultBufSize;
assert(pReducer->finalRowSize > 0);
if (pReducer->finalRowSize > 0) {
pReducer->resColModel->capacity /= pReducer->finalRowSize;
......@@ -328,7 +329,6 @@ void tscCreateLocalReducer(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrd
assert(pReducer->finalRowSize <= pReducer->rowSize);
pReducer->pFinalRes = calloc(1, pReducer->rowSize * pReducer->resColModel->capacity);
// pReducer->pBufForInterpo = calloc(1, pReducer->nResultBufSize);
if (pReducer->pTempBuffer == NULL || pReducer->discardData == NULL || pReducer->pResultBuf == NULL ||
/*pReducer->pBufForInterpo == NULL || */pReducer->pFinalRes == NULL || pReducer->prevRowOfInput == NULL) {
......@@ -856,24 +856,6 @@ void savePrevRecordAndSetupInterpoInfo(SLocalReducer *pLocalReducer, SQueryInfo
tColModelAppend(pModel, pLocalReducer->discardData, pLocalReducer->prevRowOfInput, 0, 1, 1);
}
// todo merge with following function
// static void reversedCopyResultToDstBuf(SQueryInfo* pQueryInfo, SSqlRes *pRes, tFilePage *pFinalDataPage) {
//
// for (int32_t i = 0; i < pQueryInfo->exprList.numOfExprs; ++i) {
// TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
//
// int32_t offset = tscFieldInfoGetOffset(pQueryInfo, i);
// char * src = pFinalDataPage->data + (pRes->numOfRows - 1) * pField->bytes + pRes->numOfRows * offset;
// char * dst = pRes->data + pRes->numOfRows * offset;
//
// for (int32_t j = 0; j < pRes->numOfRows; ++j) {
// memcpy(dst, src, (size_t)pField->bytes);
// dst += pField->bytes;
// src -= pField->bytes;
// }
// }
//}
static void reversedCopyFromInterpolationToDstBuf(SQueryInfo *pQueryInfo, SSqlRes *pRes, tFilePage **pResPages,
SLocalReducer *pLocalReducer) {
assert(0);
......@@ -907,20 +889,10 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
tFilePage * pFinalDataPage = pLocalReducer->pResultBuf;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
// if (pRes->pLocalReducer != pLocalReducer) {
// /*
// * Release the SSqlObj is called, and it is int destroying function invoked by other thread.
// * However, the other thread will WAIT until current process fully completes.
// * Since the flag of release struct is set by doLocalReduce function
// */
// assert(pRes->pLocalReducer == NULL);
// }
// no interval query, no fill operation
if (pQueryInfo->intervalTime == 0 || pQueryInfo->fillType == TSDB_FILL_NONE) {
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = pFinalDataPage->num;
pRes->numOfClauseTotal += pRes->numOfRows;
if (pQueryInfo->limit.offset > 0) {
if (pQueryInfo->limit.offset < pRes->numOfRows) {
......@@ -931,22 +903,22 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
tColModelCompact(pLocalReducer->resColModel, pFinalDataPage, prevSize);
pRes->numOfRows -= pQueryInfo->limit.offset;
pRes->numOfClauseTotal -= pQueryInfo->limit.offset;
pQueryInfo->limit.offset = 0;
} else {
pQueryInfo->limit.offset -= pRes->numOfRows;
pRes->numOfRows = 0;
pRes->numOfClauseTotal = 0;
}
}
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
pRes->numOfRowsGroup += pRes->numOfRows;
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRowsGroup > pQueryInfo->limit.limit) {
/* impose the limitation of output rows on the final result */
int32_t prevSize = (int32_t)pFinalDataPage->num;
int32_t overflow = (int32_t)(pRes->numOfClauseTotal - pQueryInfo->limit.limit);
int32_t prevSize = pFinalDataPage->num;
int32_t overflow = pRes->numOfRowsGroup - pQueryInfo->limit.limit;
assert(overflow < pRes->numOfRows);
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
pRes->numOfRowsGroup = pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
pFinalDataPage->num -= overflow;
......@@ -957,6 +929,8 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
memcpy(pRes->data, pFinalDataPage->data, pRes->numOfRows * pLocalReducer->finalRowSize);
pRes->numOfClauseTotal += pRes->numOfRows;
pFinalDataPage->num = 0;
return;
}
......@@ -969,7 +943,7 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
pResPages[i] = calloc(1, sizeof(tFilePage) + pField->bytes * pLocalReducer->resColModel->capacity);
}
while (1) {
int64_t newRows = taosGenerateDataBlock(pFillInfo, pResPages, pLocalReducer->resColModel->capacity);
......@@ -986,7 +960,6 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
pRes->data = pLocalReducer->pFinalRes;
pRes->numOfRows = newRows;
pRes->numOfClauseTotal += newRows;
pQueryInfo->limit.offset = 0;
break;
......@@ -1010,15 +983,13 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
}
if (pRes->numOfRows > 0) {
if (pQueryInfo->limit.limit >= 0 && pRes->numOfClauseTotal > pQueryInfo->limit.limit) {
int32_t overflow = (int32_t)(pRes->numOfClauseTotal - pQueryInfo->limit.limit);
if (pQueryInfo->limit.limit >= 0 && pRes->numOfRows > pQueryInfo->limit.limit) {
int32_t overflow = pRes->numOfRows - pQueryInfo->limit.limit;
pRes->numOfRows -= overflow;
assert(pRes->numOfRows >= 0);
pRes->numOfClauseTotal = pQueryInfo->limit.limit;
pFinalDataPage->num -= overflow;
assert(pRes->numOfRows >= 0 && pFinalDataPage->num > 0);
/* set remain data to be discarded, and reset the interpolation information */
savePrevRecordAndSetupInterpoInfo(pLocalReducer, pQueryInfo, pFillInfo);
}
......@@ -1032,6 +1003,9 @@ static void doFillResult(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool doneO
} else { // todo bug??
reversedCopyFromInterpolationToDstBuf(pQueryInfo, pRes, pResPages, pLocalReducer);
}
pRes->numOfRowsGroup += pRes->numOfRows;
pRes->numOfClauseTotal += pRes->numOfRows;
}
pFinalDataPage->num = 0;
......@@ -1227,7 +1201,10 @@ static bool saveGroupResultInfo(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->numOfGroups += 1;
if (pRes->numOfRowsGroup > 0) {
pRes->numOfGroups += 1;
}
// the output group is limited by the slimit clause
if (reachGroupResultLimit(pQueryInfo, pRes)) {
......@@ -1266,7 +1243,12 @@ bool doGenerateFinalResults(SSqlObj *pSql, SLocalReducer *pLocalReducer, bool no
pRes->numOfRows = 0;
pQueryInfo->slimit.offset -= 1;
pLocalReducer->discard = !noMoreCurrentGroupRes;
if (pLocalReducer->discard) {
SColumnModel *pInternModel = pLocalReducer->pDesc->pColumnModel;
tColModelAppend(pInternModel, pLocalReducer->discardData, pLocalReducer->pTempBuffer->data, 0, 1, 1);
}
return false;
}
......@@ -1299,7 +1281,7 @@ void resetOutputBuf(SQueryInfo *pQueryInfo, SLocalReducer *pLocalReducer) { //
static void resetEnvForNewResultset(SSqlRes *pRes, SSqlCmd *pCmd, SLocalReducer *pLocalReducer) {
// In handling data in other groups, we need to reset the interpolation information for a new group data
pRes->numOfRows = 0;
pRes->numOfClauseTotal = 0;
pRes->numOfRowsGroup = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
......@@ -1363,7 +1345,8 @@ static bool doHandleLastRemainData(SSqlObj *pSql) {
if ((isAllSourcesCompleted(pLocalReducer) && !pLocalReducer->hasPrevRow) || pLocalReducer->pLocalDataSrc[0] == NULL ||
prevGroupCompleted) {
// if fillType == TSDB_FILL_NONE, return directly
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
if (pQueryInfo->fillType != TSDB_FILL_NONE &&
((pRes->numOfRowsGroup < pQueryInfo->limit.limit && pQueryInfo->limit.limit > 0) || (pQueryInfo->limit.limit < 0))) {
int64_t etime = (pQueryInfo->window.skey < pQueryInfo->window.ekey) ? pQueryInfo->window.ekey : pQueryInfo->window.skey;
assert(pFillInfo->numOfRows == 0);
......
......@@ -989,7 +989,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql) {
}
int validateTableName(char *tblName, int len, SSQLToken* psTblToken) {
tstrncpy(psTblToken->z, tblName, TSDB_TABLE_ID_LEN);
tstrncpy(psTblToken->z, tblName, TSDB_TABLE_FNAME_LEN);
psTblToken->n = len;
psTblToken->type = TK_ID;
......@@ -1038,7 +1038,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
if (NULL == pCmd->pTableList) {
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pCmd->pTableList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
pCmd->pDataBlocks = taosArrayInit(4, POINTER_BYTES);
if (NULL == pCmd->pTableList || NULL == pSql->cmd.pDataBlocks) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1077,7 +1077,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
pCmd->curSql = sToken.z;
char buf[TSDB_TABLE_ID_LEN];
char buf[TSDB_TABLE_FNAME_LEN];
SSQLToken sTblToken;
sTblToken.z = buf;
// Check if the table name available or not
......
此差异已折叠。
......@@ -45,19 +45,27 @@ void tscSaveSubscriptionProgress(void* sub);
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
assert(pSql != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
SRpcEpSet* pEpSet = &pSql->epSet;
pEpSet->inUse = 0;
if (pVgroupInfo == NULL) {
pEpSet->numOfEps = 0;
return;
}
pEpSet->inUse = 0;
// apply the FQDN string length check here
bool hasFqdn = false;
pEpSet->numOfEps = pVgroupInfo->numOfEps;
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
if (!hasFqdn) {
hasFqdn = (strlen(pEpSet->fqdn[i]) > 0);
}
}
assert(hasFqdn);
}
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
taosCorBeginRead(&tscMgmtEpSet.version);
*epSet = tscMgmtEpSet.epSet;
......@@ -127,21 +135,6 @@ void tscPrintMgmtEp() {
}
}
/*
* For each management node, try twice at least in case of poor network situation.
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
* the poor network quality. And then, the second retry get the response with redirection command.
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
*/
UNUSED_FUNC
static int32_t tscGetMgmtConnMaxRetryTimes() {
int32_t factor = 2;
SRpcEpSet dump;
tscDumpMgmtEpSet(&dump);
return dump.numOfEps * factor;
}
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
STscObj *pObj = (STscObj *)param;
if (pObj == NULL) return;
......@@ -424,21 +417,18 @@ int doProcessSql(SSqlObj *pSql) {
}
int tscProcessSql(SSqlObj *pSql) {
char * name = NULL;
char *name = NULL;
SSqlCmd *pCmd = &pSql->cmd;
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
STableMetaInfo *pTableMetaInfo = NULL;
uint32_t type = 0;
if (pQueryInfo != NULL) {
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pTableMetaInfo != NULL) {
name = pTableMetaInfo->name;
}
name = (pTableMetaInfo != NULL)? pTableMetaInfo->name:NULL;
type = pQueryInfo->type;
// while numOfTables equals to 0, it must be Heartbeat
assert((pQueryInfo->numOfTables == 0 && pQueryInfo->command == TSDB_SQL_HB) || pQueryInfo->numOfTables > 0);
}
......@@ -450,7 +440,6 @@ int tscProcessSql(SSqlObj *pSql) {
return pSql->res.code;
}
} else if (pCmd->command < TSDB_SQL_LOCAL) {
//pSql->epSet = tscMgmtEpSet;
} else { // local handler
return (*tscProcessMsgRsp[pCmd->command])(pSql);
......@@ -597,11 +586,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
} else {
pVgroupInfo = &pTableMeta->vgroupInfo;
}
tscSetDnodeEpSet(pSql, pVgroupInfo);
if (pVgroupInfo != NULL) {
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
}
assert(pVgroupInfo != NULL);
tscSetDnodeEpSet(pSql, pVgroupInfo);
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
pTableIdInfo->tid = htonl(pTableMeta->id.tid);
......@@ -1460,7 +1449,7 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscSetResultPointer(pQueryInfo, pRes);
tscCreateResPointerInfo(pRes, pQueryInfo);
}
pRes->row = 0;
......@@ -1562,7 +1551,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
// fill head info
SMgmtHead *pMgmt = (SMgmtHead *)(pCmd->payload + tsRpcHeadSize);
memset(pMgmt->db, 0, TSDB_TABLE_ID_LEN); // server don't need the db
memset(pMgmt->db, 0, TSDB_TABLE_FNAME_LEN); // server don't need the db
SCMMultiTableInfoMsg *pInfoMsg = (SCMMultiTableInfoMsg *)(pCmd->payload + tsRpcHeadSize + sizeof(SMgmtHead));
pInfoMsg->numOfTables = htonl((int32_t)pCmd->count);
......@@ -1603,7 +1592,7 @@ int tscBuildMultiMeterMetaMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
//// tagLen += strlen(pQueryInfo->tagCond.tbnameCond.cond) * TSDB_NCHAR_SIZE;
//// }
////
//// int32_t joinCondLen = (TSDB_TABLE_ID_LEN + sizeof(int16_t)) * 2;
//// int32_t joinCondLen = (TSDB_TABLE_FNAME_LEN + sizeof(int16_t)) * 2;
//// int32_t elemSize = sizeof(SSuperTableMetaElemMsg) * pQueryInfo->numOfTables;
////
//// int32_t colSize = pQueryInfo->groupbyExpr.numOfGroupCols*sizeof(SColIndex);
......@@ -1884,11 +1873,10 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
}
pMsg += size;
}
pMsg += size;
}
return pSql->res.code;
......@@ -1966,7 +1954,7 @@ int tscProcessShowRsp(SSqlObj *pSql) {
}
int tscProcessConnectRsp(SSqlObj *pSql) {
char temp[TSDB_TABLE_ID_LEN * 2];
char temp[TSDB_TABLE_FNAME_LEN * 2];
STscObj *pObj = pSql->pTscObj;
SSqlRes *pRes = &pSql->res;
......@@ -2114,21 +2102,6 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
return 0;
}
int tscProcessRetrieveRspFromLocal(SSqlObj *pSql) {
SSqlRes * pRes = &pSql->res;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
pRes->numOfRows = htonl(pRetrieve->numOfRows);
pRes->data = pRetrieve->data;
tscSetResultPointer(pQueryInfo, pRes);
pRes->row = 0;
return 0;
}
void tscTableMetaCallBack(void *param, TAOS_RES *res, int code);
static int32_t getTableMetaFromMgmt(SSqlObj *pSql, STableMetaInfo *pTableMetaInfo) {
......
......@@ -233,22 +233,21 @@ static void waitForRetrieveRsp(void *param, TAOS_RES *tres, int numOfRows) {
tsem_post(&pSql->rspSem);
}
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
int32_t sqlLen = (int32_t)strlen(sqlstr);
if (sqlLen > tsMaxSQLStringLen) {
if (sqlLen > (uint32_t)tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_INVALID_SQL;
return NULL;
}
taosNotePrintTsc(sqlstr);
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
......@@ -259,35 +258,14 @@ TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
tsem_init(&pSql->rspSem, 0, 0);
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
// wait for the callback function to post the semaphore
tsem_wait(&pSql->rspSem);
return pSql;
return pSql;
}
TAOS_RES* taos_query_c(TAOS *taos, const char *sqlstr, uint32_t sqlLen) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) {
terrno = TSDB_CODE_TSC_DISCONNECTED;
return NULL;
}
if (sqlLen > (uint32_t)tsMaxSQLStringLen) {
tscError("sql string exceeds max length:%d", tsMaxSQLStringLen);
terrno = TSDB_CODE_TSC_INVALID_SQL;
return NULL;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
if (pSql == NULL) {
tscError("failed to malloc sqlObj");
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL;
}
doAsyncQuery(pObj, pSql, waitForQueryRsp, taos, sqlstr, sqlLen);
tsem_wait(&pSql->rspSem);
return pSql;
TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
return taos_query_c(taos, sqlstr, strlen(sqlstr));
}
int taos_result_precision(TAOS_RES *res) {
SSqlObj *pSql = (SSqlObj *)res;
if (pSql == NULL || pSql->signature != pSql) return 0;
......@@ -423,7 +401,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
pCmd->command == TSDB_SQL_INSERT) {
return NULL;
}
// set the sql object owner
tscSetSqlOwner(pSql);
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
......@@ -442,7 +423,10 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
tsem_wait(&pSql->rspSem);
}
return doSetResultRowData(pSql, true);
void* data = doSetResultRowData(pSql, true);
tscClearSqlOwner(pSql);
return data;
}
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
......@@ -510,7 +494,7 @@ int taos_select_db(TAOS *taos, const char *db) {
}
// send free message to vnode to free qhandle and corresponding resources in vnode
static bool tscFreeQhandleInVnode(SSqlObj* pSql) {
static bool tscKillQueryInVnode(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlRes* pRes = &pSql->res;
......@@ -558,16 +542,14 @@ void taos_free_result(TAOS_RES *res) {
}
pQueryInfo->type = TSDB_QUERY_TYPE_FREE_RESOURCE;
if (!tscFreeQhandleInVnode(pSql)) {
if (!tscKillQueryInVnode(pSql)) {
tscFreeSqlObj(pSql);
tscDebug("%p sqlObj is freed by app", pSql);
}
}
// todo should not be used in async query
int taos_errno(TAOS_RES *tres) {
SSqlObj *pSql = (SSqlObj *) tres;
if (pSql == NULL || pSql->signature != pSql) {
return terrno;
}
......@@ -813,7 +795,7 @@ static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t t
}
char *nextStr;
char tblName[TSDB_TABLE_ID_LEN];
char tblName[TSDB_TABLE_FNAME_LEN];
int payloadLen = 0;
char *pMsg = pCmd->payload;
while (1) {
......
......@@ -1026,9 +1026,11 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
}
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, pCmd->clauseIndex);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * pQueryInfo->fieldsInfo.numOfOutput);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
int32_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
pRes->pColumnIndex = calloc(1, sizeof(SColumnIndex) * numOfExprs);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
int32_t tableIndexOfSub = -1;
......@@ -1045,8 +1047,8 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
SSqlCmd* pSubCmd = &pSql->pSubs[tableIndexOfSub]->cmd;
SQueryInfo* pSubQueryInfo = tscGetQueryInfoDetail(pSubCmd, 0);
size_t numOfExprs = taosArrayGetSize(pSubQueryInfo->exprList);
for (int32_t k = 0; k < numOfExprs; ++k) {
size_t numOfSubExpr = taosArrayGetSize(pSubQueryInfo->exprList);
for (int32_t k = 0; k < numOfSubExpr; ++k) {
SSqlExpr* pSubExpr = tscSqlExprGet(pSubQueryInfo, k);
if (pExpr->functionId == pSubExpr->functionId && pExpr->colInfo.colId == pSubExpr->colInfo.colId) {
pRes->pColumnIndex[i] = (SColumnIndex){.tableIndex = tableIndexOfSub, .columnIndex = k};
......@@ -1054,6 +1056,10 @@ void tscSetupOutputColumnIndex(SSqlObj* pSql) {
}
}
}
// restore the offset value for super table query in case of final result.
tscRestoreSQLFuncForSTableQuery(pQueryInfo);
tscFieldInfoUpdateOffset(pQueryInfo);
}
void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
......@@ -1079,7 +1085,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(taos_errno(pSql) == code);
tscError("%p abort query, code:%d, global code:%d", pSql, code, pParentSql->res.code);
tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
pParentSql->res.code = code;
quitAllSubquery(pParentSql, pSupporter);
......@@ -2052,9 +2058,7 @@ void tscBuildResFromSubqueries(SSqlObj *pSql) {
}
while (1) {
if (pRes->row < pRes->numOfRows) {
assert(0);
}
assert (pRes->row >= pRes->numOfRows);
doBuildResFromSubqueries(pSql);
tsem_post(&pSql->rspSem);
......
......@@ -644,7 +644,7 @@ int32_t tscMergeTableDataBlocks(SSqlObj* pSql, SArray* pTableDataBlockList) {
STableDataBlocks* pOneTableBlock = taosArrayGetP(pTableDataBlockList, 0);
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
void* pVnodeDataBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
SArray* pVnodeDataBlockList = taosArrayInit(8, POINTER_BYTES);
size_t total = taosArrayGetSize(pTableDataBlockList);
......@@ -858,12 +858,13 @@ void tscFieldInfoCopy(SFieldInfo* dst, const SFieldInfo* src) {
}
TAOS_FIELD* tscFieldInfoGetField(SFieldInfo* pFieldInfo, int32_t index) {
assert(index < pFieldInfo->numOfOutput);
return TARRAY_GET_ELEM(pFieldInfo->pFields, index);
}
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pQueryInfo->fieldsInfo, index);
assert(pInfo != NULL);
assert(pInfo != NULL && pInfo->pSqlExpr != NULL);
return pInfo->pSqlExpr->offset;
}
......@@ -1680,6 +1681,77 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
return pNew;
}
// current sql function is not direct output result, so create a dummy output field
static void doSetNewFieldInfo(SQueryInfo* pNewQueryInfo, SSqlExpr* pExpr) {
TAOS_FIELD f = {.type = pExpr->resType, .bytes = pExpr->resBytes};
tstrncpy(f.name, pExpr->aliasName, sizeof(f.name));
SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, &f);
pInfo1->pSqlExpr = pExpr;
pInfo1->visible = false;
}
static void doSetSqlExprAndResultFieldInfo(SQueryInfo* pQueryInfo, SQueryInfo* pNewQueryInfo, int64_t uid) {
int32_t numOfOutput = tscSqlExprNumOfExprs(pNewQueryInfo);
if (numOfOutput == 0) {
return;
}
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
// set the field info in pNewQueryInfo object
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->uid == uid) {
if (i < pFieldInfo->numOfOutput) {
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i);
if (pInfo->pSqlExpr != NULL) {
TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i);
assert(strcmp(p->name, pExpr->aliasName) == 0);
SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p);
*pInfo1 = *pInfo;
} else {
assert(pInfo->pArithExprInfo != NULL);
doSetNewFieldInfo(pNewQueryInfo, pExpr);
}
} else { // it is a arithmetic column, does not have actual field for sqlExpr, so build it
doSetNewFieldInfo(pNewQueryInfo, pExpr);
}
}
}
// make sure the the sqlExpr for each fields is correct
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
// update the pSqlExpr pointer in SFieldSupInfo according the field name
// make sure the pSqlExpr point to the correct SqlExpr in pNewQueryInfo, not SqlExpr in pQueryInfo
for (int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
bool matched = false;
for (int32_t k1 = 0; k1 < numOfExprs; ++k1) {
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f);
pInfo->pSqlExpr = pExpr1;
matched = true;
break;
}
}
assert(matched);
}
tscFieldInfoUpdateOffset(pNewQueryInfo);
}
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void* param, int32_t cmd, SSqlObj* pPrevSql) {
SSqlCmd* pCmd = &pSql->cmd;
SSqlObj* pNew = (SSqlObj*)calloc(1, sizeof(SSqlObj));
......@@ -1773,49 +1845,7 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
uint64_t uid = pTableMetaInfo->pTableMeta->id.uid;
tscSqlExprCopy(pNewQueryInfo->exprList, pQueryInfo->exprList, uid, true);
int32_t numOfOutput = (int32_t)tscSqlExprNumOfExprs(pNewQueryInfo);
if (numOfOutput > 0) { // todo refactor to extract method
size_t numOfExprs = tscSqlExprNumOfExprs(pQueryInfo);
SFieldInfo* pFieldInfo = &pQueryInfo->fieldsInfo;
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
if (pExpr->uid == uid) {
TAOS_FIELD* p = tscFieldInfoGetField(pFieldInfo, i);
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(pFieldInfo, i);
SFieldSupInfo* pInfo1 = tscFieldInfoAppend(&pNewQueryInfo->fieldsInfo, p);
*pInfo1 = *pInfo;
}
}
// make sure the the sqlExpr for each fields is correct
// todo handle the agg arithmetic expression
numOfExprs = tscSqlExprNumOfExprs(pNewQueryInfo);
for(int32_t f = 0; f < pNewQueryInfo->fieldsInfo.numOfOutput; ++f) {
TAOS_FIELD* field = tscFieldInfoGetField(&pNewQueryInfo->fieldsInfo, f);
bool matched = false;
for(int32_t k1 = 0; k1 < numOfExprs; ++k1) {
SSqlExpr* pExpr1 = tscSqlExprGet(pNewQueryInfo, k1);
if (strcmp(field->name, pExpr1->aliasName) == 0) { // establish link according to the result field name
SFieldSupInfo* pInfo = tscFieldInfoGetSupp(&pNewQueryInfo->fieldsInfo, f);
pInfo->pSqlExpr = pExpr1;
matched = true;
break;
}
}
assert(matched);
}
tscFieldInfoUpdateOffset(pNewQueryInfo);
}
doSetSqlExprAndResultFieldInfo(pQueryInfo, pNewQueryInfo, uid);
pNew->fp = fp;
pNew->fetchFp = fp;
......@@ -2013,6 +2043,10 @@ bool hasMoreVnodesToTry(SSqlObj* pSql) {
}
int32_t numOfVgroups = pTableMetaInfo->vgroupList->numOfVgroups;
if (pTableMetaInfo->pVgroupTables != NULL) {
numOfVgroups = taosArrayGetSize(pTableMetaInfo->pVgroupTables);
}
return tscNonOrderedProjectionQueryOnSTable(pQueryInfo, 0) &&
(!tscHasReachLimitation(pQueryInfo, pRes)) && (pTableMetaInfo->vgroupIndex < numOfVgroups - 1);
}
......@@ -2200,3 +2234,21 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
return 0;
}
bool tscSetSqlOwner(SSqlObj* pSql) {
SSqlRes* pRes = &pSql->res;
// set the sql object owner
uint64_t threadId = taosGetPthreadId();
if (atomic_val_compare_exchange_64(&pSql->owner, 0, threadId) != 0) {
pRes->code = TSDB_CODE_QRY_IN_EXEC;
return false;
}
return true;
}
void tscClearSqlOwner(SSqlObj* pSql) {
assert(pSql->owner != 0);
atomic_store_64(&pSql->owner, 0);
}
\ No newline at end of file
......@@ -202,8 +202,9 @@ static void *dnodeProcessReadQueue(void *param) {
break;
}
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type);
dDebug("%p, msg:%s will be processed in vread queue, qtype:%d, msg:%p", pReadMsg->rpcMsg.ahandle,
taosMsg[pReadMsg->rpcMsg.msgType], type, pReadMsg);
int32_t code = vnodeProcessRead(pVnode, pReadMsg);
if (type == TAOS_QTYPE_RPC && code != TSDB_CODE_QRY_NOT_READY) {
......
......@@ -76,6 +76,9 @@ void* qGetResultRetrieveMsg(qinfo_t qinfo);
*/
int32_t qKillQuery(qinfo_t qinfo);
int32_t qQueryCompleted(qinfo_t qinfo);
/**
* destroy query info structure
* @param qHandle
......
......@@ -232,7 +232,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33
#define TSDB_TABLE_ID_LEN (TSDB_ACCT_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
#define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE
......
......@@ -219,6 +219,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUP_JOIN_KEY, 0, 0x0705, "Duplicated
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_EXCEED_TAGS_LIMIT, 0, 0x0706, "Tag conditon too many")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_NOT_READY, 0, 0x0707, "Query not ready")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_HAS_RSP, 0, 0x0708, "Query should response")
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_IN_EXEC, 0, 0x0709, "Multiple retrieval of this query")
// grant
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 0x0800, "License expired")
......
......@@ -246,13 +246,13 @@ typedef struct {
uint64_t uid;
uint64_t superTableUid;
uint64_t createdTime;
char tableId[TSDB_TABLE_ID_LEN];
char superTableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
char superTableId[TSDB_TABLE_FNAME_LEN];
char data[];
} SMDCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int8_t igExists;
int8_t getMeta;
......@@ -265,12 +265,12 @@ typedef struct {
} SCMCreateTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
int8_t igNotExists;
} SCMDropTableMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
int16_t type; /* operation type */
int16_t numOfCols; /* number of schema */
......@@ -297,7 +297,7 @@ typedef struct {
typedef struct {
char clientVersion[TSDB_VERSION_LEN];
char msgVersion[TSDB_VERSION_LEN];
char db[TSDB_TABLE_ID_LEN];
char db[TSDB_TABLE_FNAME_LEN];
} SCMConnectMsg;
typedef struct {
......@@ -347,14 +347,14 @@ typedef struct {
int32_t vgId;
int32_t sid;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
} SMDDropTableMsg;
typedef struct {
int32_t contLen;
int32_t vgId;
uint64_t uid;
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
} SMDDropSTableMsg;
typedef struct {
......@@ -527,7 +527,7 @@ typedef struct {
} SCMCreateDbMsg, SCMAlterDbMsg;
typedef struct {
char db[TSDB_TABLE_ID_LEN];
char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists;
} SCMDropDbMsg, SCMUseDbMsg;
......@@ -637,7 +637,7 @@ typedef struct {
} SMDCreateVnodeMsg, SMDAlterVnodeMsg;
typedef struct {
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
int16_t createFlag;
char tags[];
} SCMTableInfoMsg;
......@@ -664,7 +664,7 @@ typedef struct {
typedef struct STableMetaMsg {
int32_t contLen;
char tableId[TSDB_TABLE_ID_LEN]; // table id
char tableId[TSDB_TABLE_FNAME_LEN]; // table id
uint8_t numOfTags;
uint8_t precision;
uint8_t tableType;
......@@ -685,7 +685,7 @@ typedef struct SMultiTableMeta {
typedef struct {
int32_t dataLen;
char name[TSDB_TABLE_ID_LEN];
char name[TSDB_TABLE_FNAME_LEN];
char data[TSDB_MAX_TAGS_LEN + TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * TSDB_MAX_TAGS];
} STagData;
......@@ -771,7 +771,7 @@ typedef struct {
uint64_t uid;
uint64_t stime; // stream starting time
int32_t status;
char tableId[TSDB_TABLE_ID_LEN];
char tableId[TSDB_TABLE_FNAME_LEN];
} SMDAlterStreamMsg;
typedef struct {
......
......@@ -167,9 +167,14 @@ typedef struct SDataBlockInfo {
} SDataBlockInfo;
typedef struct {
size_t numOfTables;
void *pTable;
TSKEY lastKey;
} STableKeyInfo;
typedef struct {
size_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo from STableId
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
/**
......@@ -177,24 +182,24 @@ typedef struct {
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableqinfoGroupInfo tableId list in the form of set, seperated into different groups according to group by condition
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
TsdbQueryHandleT *tsdbQueryTables(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, void *qinfo);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each
* block
* @param tableqinfoGroupInfo tableId list.
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableqinfoGroupInfo, void *qinfo);
TsdbQueryHandleT tsdbQueryLastRow(TSDB_REPO_T *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, void *qinfo);
/**
* get the queried table object list
......@@ -260,7 +265,7 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdL
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, const char *pTagCond, size_t len,
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupList,
SColIndex *pColIndex, int32_t numOfCols);
......@@ -278,7 +283,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, STableGroupInfo *pGroupInfo);
int32_t tsdbGetOneTableGroup(TSDB_REPO_T *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
......
......@@ -193,7 +193,7 @@ void mnodeDecDbRef(SDbObj *pDb) {
}
SDbObj *mnodeGetDbByTableId(char *tableId) {
char db[TSDB_TABLE_ID_LEN], *pos;
char db[TSDB_TABLE_FNAME_LEN], *pos;
// tableId format should be : acct.db.table
pos = strstr(tableId, TS_PATH_DELIMITER);
......@@ -1046,7 +1046,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pDrop->db);
if (pMsg->pDb == NULL) {
if (pDrop->ignoreNotExists) {
mDebug("db:%s, db is not exist, think drop success", pDrop->db);
mDebug("db:%s, db is not exist, treat as success", pDrop->db);
return TSDB_CODE_SUCCESS;
} else {
mError("db:%s, failed to drop, invalid db", pDrop->db);
......
......@@ -73,13 +73,13 @@ int32_t mnodeInitProfile() {
void mnodeCleanupProfile() {
if (tsMnodeConnCache != NULL) {
mInfo("conn cache is cleanup");
taosCacheCleanup(tsMnodeConnCache);
tsMnodeConnCache = NULL;
}
}
SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
#if 0
int32_t connSize = taosHashGetSize(tsMnodeConnCache->pHashTable);
if (connSize > tsMaxShellConns) {
mError("failed to create conn for user:%s ip:%s:%u, conns:%d larger than maxShellConns:%d, ", user, taosIpStr(ip),
......@@ -87,6 +87,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
terrno = TSDB_CODE_MND_TOO_MANY_SHELL_CONNS;
return NULL;
}
#endif
int32_t connId = atomic_add_fetch_32(&tsConnIndex, 1);
if (connId == 0) atomic_add_fetch_32(&tsConnIndex, 1);
......@@ -100,7 +101,7 @@ SConnObj *mnodeCreateConn(char *user, uint32_t ip, uint16_t port) {
tstrncpy(connObj.user, user, sizeof(connObj.user));
SConnObj *pConn = taosCachePut(tsMnodeConnCache, &connId, sizeof(int32_t), &connObj, sizeof(connObj), CONN_KEEP_TIME);
mDebug("connId:%d, is created, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return pConn;
}
......@@ -112,7 +113,7 @@ void mnodeReleaseConn(SConnObj *pConn) {
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) {
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheUpdateExpireTimeByName(tsMnodeConnCache, &connId, sizeof(int32_t), expireTime);
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
return NULL;
......
......@@ -874,7 +874,7 @@ void *sdbOpenTable(SSdbTableDesc *pDesc) {
if (pTable->keyType == SDB_KEY_STRING || pTable->keyType == SDB_KEY_VAR_STRING) {
hashFp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
}
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true);
pTable->iHandle = taosHashInit(pTable->hashSessions, hashFp, true, true);
tsSdbObj.numOfTables++;
tsSdbObj.tableList[pTable->tableId] = pTable;
......
......@@ -302,7 +302,7 @@ static int32_t mnodeProcessConnectMsg(SMnodeMsg *pMsg) {
SAcctObj *pAcct = pUser->pAcct;
if (pConnectMsg->db[0]) {
char dbName[TSDB_TABLE_ID_LEN * 3] = {0};
char dbName[TSDB_TABLE_FNAME_LEN * 3] = {0};
sprintf(dbName, "%x%s%s", pAcct->acctId, TS_PATH_DELIMITER, pConnectMsg->db);
SDbObj *pDb = mnodeGetDb(dbName);
if (pDb == NULL) {
......
......@@ -215,7 +215,7 @@ static int32_t mnodeChildTableActionEncode(SSdbOper *pOper) {
assert(pTable != NULL && pOper->rowData != NULL);
int32_t len = strlen(pTable->info.tableId);
if (len >= TSDB_TABLE_ID_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID;
if (len >= TSDB_TABLE_FNAME_LEN) return TSDB_CODE_MND_INVALID_TABLE_ID;
memcpy(pOper->rowData, pTable->info.tableId, len);
memset(pOper->rowData + len, 0, 1);
......@@ -246,7 +246,7 @@ static int32_t mnodeChildTableActionDecode(SSdbOper *pOper) {
if (pTable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = strlen(pOper->rowData);
if (len >= TSDB_TABLE_ID_LEN) {
if (len >= TSDB_TABLE_FNAME_LEN) {
free(pTable);
return TSDB_CODE_MND_INVALID_TABLE_ID;
}
......@@ -348,7 +348,7 @@ static int32_t mnodeInitChildTables() {
.tableId = SDB_TABLE_CTABLE,
.tableName = "ctables",
.hashSessions = TSDB_DEFAULT_CTABLES_HASH_SIZE,
.maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_ID_LEN + TSDB_CQ_SQL_SIZE,
.maxRowSize = sizeof(SChildTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN + TSDB_CQ_SQL_SIZE,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_VAR_STRING,
.insertFp = mnodeChildTableActionInsert,
......@@ -387,7 +387,7 @@ static void mnodeAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCt
atomic_add_fetch_32(&pStable->numOfTables, 1);
if (pStable->vgHash == NULL) {
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pStable->vgHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
}
if (pStable->vgHash != NULL) {
......@@ -479,7 +479,7 @@ static int32_t mnodeSuperTableActionEncode(SSdbOper *pOper) {
assert(pOper->pObj != NULL && pOper->rowData != NULL);
int32_t len = strlen(pStable->info.tableId);
if (len >= TSDB_TABLE_ID_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID;
if (len >= TSDB_TABLE_FNAME_LEN) len = TSDB_CODE_MND_INVALID_TABLE_ID;
memcpy(pOper->rowData, pStable->info.tableId, len);
memset(pOper->rowData + len, 0, 1);
......@@ -503,7 +503,7 @@ static int32_t mnodeSuperTableActionDecode(SSdbOper *pOper) {
if (pStable == NULL) return TSDB_CODE_MND_OUT_OF_MEMORY;
int32_t len = strlen(pOper->rowData);
if (len >= TSDB_TABLE_ID_LEN){
if (len >= TSDB_TABLE_FNAME_LEN){
free(pStable);
return TSDB_CODE_MND_INVALID_TABLE_ID;
}
......@@ -539,7 +539,7 @@ static int32_t mnodeInitSuperTables() {
.tableId = SDB_TABLE_STABLE,
.tableName = "stables",
.hashSessions = TSDB_DEFAULT_STABLES_HASH_SIZE,
.maxRowSize = sizeof(SSuperTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_ID_LEN,
.maxRowSize = sizeof(SSuperTableObj) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16) + TSDB_TABLE_FNAME_LEN,
.refCountPos = (int8_t *)(&tObj.refCount) - (int8_t *)&tObj,
.keyType = SDB_KEY_VAR_STRING,
.insertFp = mnodeSuperTableActionInsert,
......@@ -751,7 +751,7 @@ static int32_t mnodeProcessDropTableMsg(SMnodeMsg *pMsg) {
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pDrop->tableId);
if (pMsg->pTable == NULL) {
if (pDrop->igNotExists) {
mDebug("app:%p:%p, table:%s, table is not exist, think drop success", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
mDebug("app:%p:%p, table:%s, table is not exist, treat as success", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
return TSDB_CODE_SUCCESS;
} else {
mError("app:%p:%p, table:%s, failed to drop table, table not exist", pMsg->rpcMsg.ahandle, pMsg, pDrop->tableId);
......@@ -1464,7 +1464,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
// reserve space
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + 32 * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo);
for (int32_t i = 0; i < numOfTable; ++i) {
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN) * i;
SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable != NULL && pTable->vgHash != NULL) {
contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo));
......@@ -1481,7 +1481,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg);
for (int32_t i = 0; i < numOfTable; ++i) {
char * stableName = (char *)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN)*i;
char * stableName = (char *)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
SSuperTableObj *pTable = mnodeGetSuperTable(stableName);
if (pTable == NULL) {
mError("app:%p:%p, stable:%s, not exist while get stable vgroup info", pMsg->rpcMsg.ahandle, pMsg, stableName);
......@@ -1828,7 +1828,7 @@ static int32_t mnodeSendDropChildTableMsg(SMnodeMsg *pMsg, bool needReturn) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
tstrncpy(pDrop->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN);
tstrncpy(pDrop->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
pDrop->vgId = htonl(pTable->vgId);
pDrop->contLen = htonl(sizeof(SMDDropTableMsg));
pDrop->sid = htonl(pTable->sid);
......@@ -2079,7 +2079,7 @@ static int32_t mnodeDoGetChildTableMeta(SMnodeMsg *pMsg, STableMetaMsg *pMeta) {
pMeta->sid = htonl(pTable->sid);
pMeta->precision = pDb->cfg.precision;
pMeta->tableType = pTable->info.type;
tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_ID_LEN);
tstrncpy(pMeta->tableId, pTable->info.tableId, TSDB_TABLE_FNAME_LEN);
if (pTable->info.type == TSDB_CHILD_TABLE) {
pMeta->sversion = htons(pTable->superTable->sversion);
......@@ -2448,7 +2448,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMultiMeta->numOfTables = 0;
for (int32_t t = 0; t < pInfo->numOfTables; ++t) {
char * tableId = (char *)(pInfo->tableIds + t * TSDB_TABLE_ID_LEN);
char * tableId = (char *)(pInfo->tableIds + t * TSDB_TABLE_FNAME_LEN);
SChildTableObj *pTable = mnodeGetChildTable(tableId);
if (pTable == NULL) continue;
......
......@@ -167,8 +167,8 @@ bool httpInitContext(HttpContext *pContext) {
memset(pParser, 0, sizeof(HttpParser));
pParser->pCur = pParser->pLast = pParser->buffer;
httpDebug("context:%p, fd:%d, ip:%s, thread:%s, accessTimes:%d, parsed:%d",
pContext, pContext->fd, pContext->ipstr, pContext->pThread->label, pContext->accessTimes, pContext->parsed);
httpDebug("context:%p, fd:%d, ip:%s, accessTimes:%d, parsed:%d", pContext, pContext->fd, pContext->ipstr,
pContext->accessTimes, pContext->parsed);
return true;
}
......
......@@ -192,7 +192,7 @@ static void dnodeBuildMonitorSql(char *sql, int32_t cmd) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.slowquery(ts timestamp, username "
"binary(%d), created_time timestamp, time bigint, sql binary(%d))",
tsMonitorDbName, TSDB_TABLE_ID_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN);
tsMonitorDbName, TSDB_TABLE_FNAME_LEN - 1, TSDB_SLOW_QUERY_SQL_LEN);
} else if (cmd == MONITOR_CMD_CREATE_TB_LOG) {
snprintf(sql, SQL_LENGTH,
"create table if not exists %s.log(ts timestamp, level tinyint, "
......
......@@ -48,7 +48,7 @@ typedef struct tQueryInfo {
SSchema sch; // schema of tags
char* q;
__compar_fn_t compare; // filter function
void* param; // STSchema
bool indexed; // indexed columns
} tQueryInfo;
typedef struct SExprTraverseSupp {
......
......@@ -52,10 +52,10 @@ typedef struct SWindowStatus {
typedef struct SWindowResult {
uint16_t numOfRows; // number of rows of current time window
SWindowStatus status; // this result status: closed or opened
SPosInfo pos; // Position of current result in disk-based output buffer
SResultInfo* resultInfo; // For each result column, there is a resultInfo
STimeWindow window; // The time window that current result covers.
SWindowStatus status; // this result status: closed or opened
} SWindowResult;
/**
......@@ -122,6 +122,7 @@ typedef struct SQueryCostInfo {
uint32_t discardBlocks;
uint64_t elapsedTime;
uint64_t computTime;
uint64_t internalSupSize;
} SQueryCostInfo;
typedef struct SQuery {
......@@ -184,10 +185,8 @@ enum {
typedef struct SQInfo {
void* signature;
int32_t pointsInterpo;
int32_t code; // error code to returned to client
//tsem_t dataReady;
int32_t code; // error code to returned to client
pthread_t owner; // if it is in execution
void* tsdb;
int32_t vgId;
STableGroupInfo tableGroupInfo; // table id list < only includes the STable list>
......
......@@ -29,7 +29,7 @@ extern "C" {
#define MAX_TMPFILE_PATH_LENGTH PATH_MAX
#define INITIAL_ALLOCATION_BUFFER_SIZE 64
#define DEFAULT_PAGE_SIZE (4096L) // 16k larger than the SHistoInfo
#define DEFAULT_PAGE_SIZE (1024L) // 16k larger than the SHistoInfo
typedef enum EXT_BUFFER_FLUSH_MODEL {
/*
......
......@@ -26,7 +26,12 @@
#include <string.h>
#include <assert.h>
#include <stdbool.h>
#include "qSqlparser.h"
#include "tcmdtype.h"
#include "tstoken.h"
#include "ttokendef.h"
#include "tutil.h"
#include "tvariant.h"
}
%syntax_error {
......@@ -254,7 +259,7 @@ alter_db_optr(Y) ::= alter_db_optr(Z) keep(X). { Y = Z; Y.keep = X; }
alter_db_optr(Y) ::= alter_db_optr(Z) blocks(X). { Y = Z; Y.numOfBlocks = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) comp(X). { Y = Z; Y.compressionLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) wal(X). { Y = Z; Y.walLevel = strtol(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtod(X.z, NULL, 10); }
alter_db_optr(Y) ::= alter_db_optr(Z) fsync(X). { Y = Z; Y.fsyncPeriod = strtol(X.z, NULL, 10); }
%type typename {TAOS_FIELD}
typename(A) ::= ids(X). {
......@@ -422,8 +427,35 @@ as(X) ::= . { X.n = 0; }
from(A) ::= FROM tablelist(X). {A = X;}
%type tablelist {tVariantList*}
tablelist(A) ::= ids(X) cpxName(Y). { toTSDBType(X.type); X.n += Y.n; A = tVariantListAppendToken(NULL, &X, -1);}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z). { toTSDBType(X.type); X.n += Z.n; A = tVariantListAppendToken(Y, &X, -1); }
tablelist(A) ::= ids(X) cpxName(Y). {
toTSDBType(X.type);
X.n += Y.n;
A = tVariantListAppendToken(NULL, &X, -1);
A = tVariantListAppendToken(A, &X, -1); // table alias name
}
tablelist(A) ::= ids(X) cpxName(Y) ids(Z). {
toTSDBType(X.type);
toTSDBType(Z.type);
X.n += Y.n;
A = tVariantListAppendToken(NULL, &X, -1);
A = tVariantListAppendToken(A, &Z, -1);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z). {
toTSDBType(X.type);
X.n += Z.n;
A = tVariantListAppendToken(Y, &X, -1);
A = tVariantListAppendToken(A, &X, -1);
}
tablelist(A) ::= tablelist(Y) COMMA ids(X) cpxName(Z) ids(F). {
toTSDBType(X.type);
toTSDBType(F.type);
X.n += Z.n;
A = tVariantListAppendToken(Y, &X, -1);
A = tVariantListAppendToken(A, &F, -1);
}
// The value of interval should be the form of "number+[a,s,m,h,d,n,y]" or "now"
%type tmvar {SSQLToken}
......
......@@ -427,8 +427,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (ret != 0) {
break;
}
taosArrayPush(result, SL_GET_NODE_DATA(pNode));
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else if (optr == TSDB_RELATION_GREATER || optr == TSDB_RELATION_GREATER_EQUAL) { // greater equal
bool comp = true;
......@@ -445,7 +446,8 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (ret == 0 && optr == TSDB_RELATION_GREATER) {
continue;
} else {
taosArrayPush(result, SL_GET_NODE_DATA(pNode));
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false;
}
}
......@@ -458,8 +460,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (comp) {
continue;
}
taosArrayPush(result, SL_GET_NODE_DATA(pNode));
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
tSkipListDestroyIter(iter);
......@@ -472,8 +475,9 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (comp) {
continue;
}
taosArrayPush(result, SL_GET_NODE_DATA(pNode));
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
}
} else {
......@@ -496,12 +500,14 @@ static void tQueryIndexColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo, SArr
if (ret == 0 && optr == TSDB_RELATION_LESS) {
continue;
} else {
taosArrayPush(result, SL_GET_NODE_DATA(pNode));
STableKeyInfo info = {.pTable = *(void**)SL_GET_NODE_DATA(pNode), .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(result, &info);
comp = false; // no need to compare anymore
}
}
}
}
free(cond.start);
free(cond.end);
tSkipListDestroyIter(iter);
......@@ -689,7 +695,8 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
}
if (addToResult) {
taosArrayPush(res, pData);
STableKeyInfo info = {.pTable = *(void**)pData, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(res, &info);
}
}
......@@ -716,7 +723,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
}
tQueryInfo *pQueryInfo = pExpr->_node.info;
if (pQueryInfo->sch.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pQueryInfo->optr != TSDB_RELATION_LIKE) {
if (pQueryInfo->indexed && pQueryInfo->optr != TSDB_RELATION_LIKE) {
tQueryIndexColumn(pSkipList, pQueryInfo, result);
} else {
tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
......
此差异已折叠。
......@@ -12,15 +12,14 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "qExtbuffer.h"
#include "os.h"
#include "qExtbuffer.h"
#include "queryLog.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
#include "tulog.h"
#include "tutil.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
......
......@@ -6,7 +6,8 @@
#include "queryLog.h"
#include "taoserror.h"
#define GET_DATA_PAYLOAD(_p) ((tFilePage*)(((char*)(_p)->pData) + POINTER_BYTES))
#define GET_DATA_PAYLOAD(_p) ((_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t rowSize, int32_t pagesize,
int32_t inMemBufSize, const void* handle) {
......@@ -25,7 +26,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
pResBuf->comp = true;
pResBuf->file = NULL;
pResBuf->handle = handle;
pResBuf->fileSize = 0;
pResBuf->fileSize = 0;
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
......@@ -34,9 +35,9 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t ro
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
char path[PATH_MAX] = {0};
taosGetTmpfilePath("qbuf", path);
......@@ -186,8 +187,6 @@ static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) {
return (char*)GET_DATA_PAYLOAD(pg);
}
#define NO_AVAILABLE_PAGES(_b) ((_b)->numOfPages >= (_b)->inMemPages)
static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL);
......@@ -211,11 +210,12 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId,
pResultBuf->numOfPages += 1;
SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL};
ppi->info = PAGE_INFO_INITIALIZER;
ppi->pageId = pageId;
ppi->pData = NULL;
ppi->pn = NULL;
ppi->used = true;
ppi->pageId = pageId;
ppi->pData = NULL;
ppi->info = PAGE_INFO_INITIALIZER;
ppi->used = true;
ppi->pn = NULL;
return *(SPageInfo**) taosArrayPush(list, &ppi);
}
......@@ -246,7 +246,9 @@ static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) {
// all pages are referenced by user, try to allocate new space
if (pn == NULL) {
int32_t prev = pResultBuf->inMemPages;
pResultBuf->inMemPages = (int32_t)(pResultBuf->inMemPages * 1.5f);
// increase by 50% of previous mem pages
pResultBuf->inMemPages = pResultBuf->inMemPages * 1.5f;
qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pResultBuf, prev,
pResultBuf->inMemPages, pResultBuf->pageSize);
......@@ -281,7 +283,7 @@ tFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32
pResultBuf->statis.getPages += 1;
char* availablePage = NULL;
if (NO_AVAILABLE_PAGES(pResultBuf)) {
if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf);
}
......@@ -340,7 +342,7 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->info.length >= 0 && (*pi)->info.offset >= 0);
char* availablePage = NULL;
if (NO_AVAILABLE_PAGES(pResultBuf)) {
if (NO_IN_MEM_AVAILABLE_PAGES(pResultBuf)) {
availablePage = evicOneDataPage(pResultBuf);
}
......@@ -353,6 +355,8 @@ tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
((void**)((*pi)->pData))[0] = (*pi);
lruListPushFront(pResultBuf->lruList, *pi);
(*pi)->used = true;
loadPageFromDisk(pResultBuf, *pi);
return GET_DATA_PAYLOAD(*pi);
}
......@@ -396,12 +400,13 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) {
}
if (pResultBuf->file != NULL) {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, file size:%"PRId64" bytes",
pResultBuf->handle, pResultBuf->totalBufSize, pResultBuf->fileSize);
qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, inmem size:%dbytes, file size:%"PRId64" bytes",
pResultBuf->handle, pResultBuf->totalBufSize, listNEles(pResultBuf->lruList) * pResultBuf->pageSize,
pResultBuf->fileSize);
fclose(pResultBuf->file);
} else {
qDebug("QInfo:%p disk-based output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle,
qDebug("QInfo:%p res output buffer closed, total:%" PRId64 " bytes, no file created", pResultBuf->handle,
pResultBuf->totalBufSize);
}
......
......@@ -25,7 +25,7 @@
// All the keywords of the SQL language are stored in a hash table
typedef struct SKeyword {
const char* name; // The keyword name
uint16_t type; // type
uint16_t type; // type
uint8_t len; // length
} SKeyword;
......@@ -257,7 +257,7 @@ static void* KeywordHashTable = NULL;
static void doInitKeywordsTable(void) {
int numOfEntries = tListLen(keywordTable);
KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, false);
KeywordHashTable = taosHashInit(numOfEntries, MurmurHash3_32, true, false);
for (int32_t i = 0; i < numOfEntries; i++) {
keywordTable[i].len = (uint8_t)strlen(keywordTable[i].name);
void* ptr = &keywordTable[i];
......
......@@ -72,6 +72,7 @@ STSBuf* tsBufCreateFromFile(const char* path, bool autoDelete) {
// invalid file
if (header.magic != TS_COMP_FILE_MAGIC) {
tsBufDestroy(pTSBuf);
return NULL;
}
......
......@@ -37,7 +37,7 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->type = type;
_hash_fn_t fn = taosGetDefaultHashFunction(type);
pWindowResInfo->hashList = taosHashInit(threshold, fn, false);
pWindowResInfo->hashList = taosHashInit(threshold, fn, true, false);
if (pWindowResInfo->hashList == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
......@@ -46,12 +46,17 @@ int32_t initWindowResInfo(SWindowResInfo *pWindowResInfo, SQueryRuntimeEnv *pRun
pWindowResInfo->size = 0;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
pRuntimeEnv->summary.internalSupSize += sizeof(SWindowResult) * threshold;
// use the pointer arraylist
pWindowResInfo->pResult = calloc(threshold, sizeof(SWindowResult));
if (pWindowResInfo->pResult == NULL) {
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
pRuntimeEnv->summary.internalSupSize += sizeof(SWindowResult) * threshold;
pRuntimeEnv->summary.internalSupSize += (pRuntimeEnv->pQuery->numOfOutput * sizeof(SResultInfo) + pRuntimeEnv->interBufSize) * pWindowResInfo->capacity;
for (int32_t i = 0; i < pWindowResInfo->capacity; ++i) {
int32_t code = createQueryResultInfo(pRuntimeEnv->pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
if (code != TSDB_CODE_SUCCESS) {
......@@ -104,7 +109,7 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR
pWindowResInfo->size = 0;
_hash_fn_t fn = taosGetDefaultHashFunction(pWindowResInfo->type);
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, false);
pWindowResInfo->hashList = taosHashInit(pWindowResInfo->capacity, fn, true, false);
pWindowResInfo->startTime = TSKEY_INITIAL_VAL;
pWindowResInfo->prevSKey = TSKEY_INITIAL_VAL;
......
此差异已折叠。
......@@ -130,7 +130,6 @@ void recyclePageTest() {
ASSERT_TRUE(t4 == pBufPage4);
ASSERT_TRUE(pageId == 4);
releaseResBufPage(pResultBuf, t4);
releaseResBufPage(pResultBuf, t4);
tFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId);
tFilePage* t5 = getResBufPage(pResultBuf, pageId);
......
......@@ -47,6 +47,8 @@ void simpleTest() {
EXPECT_EQ(pTSBuf->block.numOfElem, num);
tsBufDestroy(pTSBuf);
free(list);
}
// one large list of ts, the ts list need to be split into several small blocks
......@@ -71,6 +73,7 @@ void largeTSTest() {
EXPECT_EQ(pTSBuf->block.numOfElem, num);
tsBufDestroy(pTSBuf);
free(list);
}
void multiTagsTest() {
......@@ -208,6 +211,8 @@ void loadDataTest() {
int64_t e = taosGetTimestampUs();
printf("end:%" PRIu64 ", elapsed:%" PRIu64 ", total obj:%d\n", e, e - s, x);
tsBufDestroy(pTSBuf);
tsBufDestroy(pNewBuf);
}
void randomIncTsTest() {}
......@@ -338,6 +343,8 @@ void TSTraverse() {
}
}
}
tsBufDestroy(pTSBuf);
}
void performanceTest() {}
......@@ -352,9 +359,12 @@ void invalidFileTest() {
STSBuf* pNewBuf = tsBufCreateFromFile("/tmp/test", true);
EXPECT_TRUE(pNewBuf == NULL);
tsBufDestroy(pNewBuf);
pNewBuf = tsBufCreateFromFile("/tmp/911", true);
EXPECT_TRUE(pNewBuf == NULL);
tsBufDestroy(pNewBuf);
}
void mergeDiffVnodeBufferTest() {
......
......@@ -260,7 +260,7 @@ void *rpcOpen(const SRpcInit *pInit) {
}
if (pRpc->connType == TAOS_CONN_SERVER) {
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, true);
if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label);
rpcClose(pRpc);
......
......@@ -96,7 +96,7 @@ static void syncModuleInitFunc() {
return;
}
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
vgIdHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (vgIdHash == NULL) {
taosTmrCleanUp(syncTmrCtrl);
taosCloseTcpThreadPool(tsTcpPool);
......
......@@ -151,22 +151,22 @@ typedef struct {
// ------------------ tsdbFile.c
extern const char* tsdbFileSuffix[];
typedef enum {
#ifdef TSDB_IDX
TSDB_FILE_TYPE_IDX = 0,
TSDB_FILE_TYPE_HEAD,
#else
TSDB_FILE_TYPE_HEAD = 0,
#endif
TSDB_FILE_TYPE_DATA,
TSDB_FILE_TYPE_LAST,
TSDB_FILE_TYPE_MAX,
#ifdef TSDB_IDX
TSDB_FILE_TYPE_NIDX,
#endif
TSDB_FILE_TYPE_STAT,
TSDB_FILE_TYPE_NHEAD,
TSDB_FILE_TYPE_NLAST
TSDB_FILE_TYPE_NDATA,
TSDB_FILE_TYPE_NLAST,
TSDB_FILE_TYPE_NSTAT
} TSDB_FILE_TYPE;
#ifndef TDINTERNAL
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_LAST+1)
#else
#define TSDB_FILE_TYPE_MAX (TSDB_FILE_TYPE_STAT+1)
#endif
typedef struct {
uint32_t magic;
uint32_t len;
......@@ -281,9 +281,6 @@ typedef struct {
TSKEY minKey;
TSKEY maxKey;
SFileGroup fGroup;
#ifdef TSDB_IDX
SFile nIdxF;
#endif
SFile nHeadF;
SFile nLastF;
} SHelperFile;
......@@ -497,10 +494,6 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
#define helperState(h) (h)->state
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
#define helperFileId(h) ((h)->files.fGroup.fileId)
#ifdef TSDB_IDX
#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX]))
#define helperNewIdxF(h) (&((h)->files.nIdxF))
#endif
#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
......@@ -512,7 +505,7 @@ int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
void tsdbDestroyHelper(SRWHelper* pHelper);
void tsdbResetHelper(SRWHelper* pHelper);
int tsdbSetAndOpenHelperFile(SRWHelper* pHelper, SFileGroup* pGroup);
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError);
int tsdbCloseHelperFile(SRWHelper* pHelper, bool hasError, SFileGroup* pGroup);
int tsdbSetHelperTable(SRWHelper* pHelper, STable* pTable, STsdbRepo* pRepo);
int tsdbCommitTableData(SRWHelper* pHelper, SCommitIter* pCommitIter, SDataCols* pDataCols, TSKEY maxKey);
int tsdbMoveLastBlockIfNeccessary(SRWHelper* pHelper);
......
......@@ -21,11 +21,7 @@
#define TAOS_RANDOM_FILE_FAIL_TEST
#ifdef TSDB_IDX
const char *tsdbFileSuffix[] = {".idx", ".head", ".data", ".last", "", ".i", ".h", ".l"};
#else
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"};
#endif
const char *tsdbFileSuffix[] = {".head", ".data", ".last", ".stat", ".h", ".d", ".l", ".s"};
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
static void tsdbDestroyFile(SFile *pFile);
......@@ -413,6 +409,10 @@ static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type) {
pBuf = taosDecodeFixedU32(pBuf, &version);
pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info));
if (pFile->info.size == TSDB_FILE_HEAD_SIZE) {
pFile->info.size = lseek(pFile->fd, 0, SEEK_END);
}
if (version != TSDB_FILE_VERSION) {
tsdbError("vgId:%d file %s version %u is not the same as program version %u which may cause problem",
REPO_ID(pRepo), pFile->fname, version, TSDB_FILE_VERSION);
......
......@@ -679,15 +679,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
}
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 0);
tsdbCloseHelperFile(pHelper, 0, pGroup);
pthread_rwlock_wrlock(&(pFileH->fhlock));
#ifdef TSDB_IDX
rename(helperNewIdxF(pHelper)->fname, helperIdxF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_IDX].info = helperNewIdxF(pHelper)->info;
#endif
rename(helperNewHeadF(pHelper)->fname, helperHeadF(pHelper)->fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info;
......@@ -706,7 +701,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
_err:
taosTFree(dataDir);
tsdbCloseHelperFile(pHelper, 1);
tsdbCloseHelperFile(pHelper, 1, NULL);
return -1;
}
......
......@@ -443,7 +443,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
goto _err;
}
pMeta->uidMap = taosHashInit((size_t)(TSDB_INIT_NTABLES * 1.1), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pMeta->uidMap = taosHashInit((size_t)(TSDB_INIT_NTABLES * 1.1), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pMeta->uidMap == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto _err;
......
......@@ -91,7 +91,7 @@ void tsdbResetHelper(SRWHelper *pHelper) {
tsdbResetHelperTableImpl(pHelper);
// Reset the file part
tsdbCloseHelperFile(pHelper, false);
tsdbCloseHelperFile(pHelper, false, NULL);
tsdbResetHelperFileImpl(pHelper);
pHelper->state = TSDB_HELPER_CLEAR_STATE;
......@@ -110,31 +110,16 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
// Set the files
pHelper->files.fGroup = *pGroup;
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
#ifdef TSDB_IDX
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NIDX, helperNewIdxF(pHelper)->fname);
#endif
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname);
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname);
}
// Open the files
#ifdef TSDB_IDX
if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) return -1;
#endif
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) return -1;
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) return -1;
if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) return -1;
#ifdef TSDB_IDX
// Create and open .i file
pFile = helperNewIdxF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
pFile->info.size = TSDB_FILE_HEAD_SIZE;
pFile->info.magic = TSDB_FILE_INIT_MAGIC;
if (tsdbUpdateFileHeader(pFile) < 0) return -1;
#endif
// Create and open .h
pFile = helperNewHeadF(pHelper);
if (tsdbOpenFile(pFile, O_WRONLY | O_CREAT) < 0) return -1;
......@@ -161,14 +146,9 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
return 0;
}
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError, SFileGroup *pGroup) {
SFile *pFile = NULL;
#ifdef TSDB_IDX
pFile = helperIdxF(pHelper);
tsdbCloseFile(pFile);
#endif
pFile = helperHeadF(pHelper);
tsdbCloseFile(pFile);
......@@ -177,10 +157,11 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
if (!hasError) {
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else {
// TODO: shrink back to origin
ASSERT(pGroup != NULL);
taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_DATA].info.size);
}
fsync(pFile->fd);
}
tsdbCloseFile(pFile);
}
......@@ -190,27 +171,16 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
if (!hasError) {
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
} else {
// TODO: shrink back to origin
ASSERT(pGroup != NULL);
taosFtruncate(pFile->fd, pGroup->files[TSDB_FILE_TYPE_LAST].info.size);
}
fsync(pFile->fd);
}
tsdbCloseFile(pFile);
}
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
#ifdef TSDB_IDX
pFile = helperNewIdxF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
tsdbUpdateFileHeader(pFile);
fsync(pFile->fd);
}
tsdbCloseFile(pFile);
if (hasError) (void)remove(pFile->fname);
}
#endif
pFile = helperNewHeadF(pHelper);
if (pFile->fd > 0) {
if (!hasError) {
......@@ -412,10 +382,6 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
return -1;
}
#ifdef TSDB_IDX
pFile = helperNewIdxF(pHelper);
#endif
if (taosTSizeof(pHelper->pWIdx) < pFile->info.len + sizeof(SCompIdx) + 12) {
pHelper->pWIdx = taosTRealloc(pHelper->pWIdx, taosTSizeof(pHelper->pWIdx) == 0 ? 1024 : taosTSizeof(pHelper->pWIdx) * 2);
if (pHelper->pWIdx == NULL) {
......@@ -426,6 +392,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
void *pBuf = POINTER_SHIFT(pHelper->pWIdx, pFile->info.len);
pFile->info.len += tsdbEncodeSCompIdx(&pBuf, &(pHelper->curCompIdx));
pFile->info.size += pIdx->len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
}
return 0;
......@@ -435,11 +404,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
off_t offset = 0;
#ifdef TSDB_IDX
SFile *pFile = helperNewIdxF(pHelper);
#else
SFile *pFile = helperNewHeadF(pHelper);
#endif
pFile->info.len += sizeof(TSCKSUM);
if (taosTSizeof(pHelper->pWIdx) < pFile->info.len) {
......@@ -460,7 +425,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return -1;
}
pFile->info.offset = offset;
ASSERT(offset == pFile->info.size);
if (taosTWrite(pFile->fd, (void *)pHelper->pWIdx, pFile->info.len) < (int)pFile->info.len) {
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
......@@ -469,16 +434,16 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
return -1;
}
pFile->info.offset = offset;
pFile->info.size += pFile->info.len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0;
}
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
ASSERT(pHelper->state == TSDB_HELPER_FILE_SET_AND_OPEN);
#ifdef TSDB_IDX
SFile *pFile = helperIdxF(pHelper);
#else
SFile *pFile = helperHeadF(pHelper);
#endif
int fd = pFile->fd;
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
......@@ -847,6 +812,9 @@ static int tsdbWriteBlockToFile(SRWHelper *pHelper, SFile *pFile, SDataCols *pDa
(int)(pCompBlock->numOfRows), pCompBlock->len, pCompBlock->numOfCols, pCompBlock->keyFirst,
pCompBlock->keyLast);
pFile->info.size += pCompBlock->len;
// ASSERT(pFile->info.size == lseek(pFile->fd, 0, SEEK_CUR));
return 0;
_err:
......@@ -1052,10 +1020,6 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
helperLastF(pHelper)->fd = -1;
helperNewHeadF(pHelper)->fd = -1;
helperNewLastF(pHelper)->fd = -1;
#ifdef TSDB_IDX
helperIdxF(pHelper)->fd = -1;
helperNewIdxF(pHelper)->fd = -1;
#endif
}
static int tsdbInitHelperFile(SRWHelper *pHelper) {
......@@ -1064,7 +1028,7 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) {
}
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
tsdbCloseHelperFile(pHelper, false);
tsdbCloseHelperFile(pHelper, false, NULL);
tsdbResetHelperFileImpl(pHelper);
taosTZfree(pHelper->idxH.pIdxArray);
taosTZfree(pHelper->pWIdx);
......
......@@ -172,6 +172,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
if (pQueryHandle == NULL) {
goto out_of_memory;
}
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb;
......@@ -183,6 +184,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
pQueryHandle->qinfo = qinfo;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
pQueryHandle->allocSize = 0;
pQueryHandle->locateStart = false;
if (tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb) != 0) {
goto out_of_memory;
......@@ -193,6 +195,12 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
size_t sizeOfGroup = taosArrayGetSize(groupList->pGroupList);
assert(sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
// allocate buffer in order to load data blocks from file
int32_t numOfCols = pCond->numOfCols;
......@@ -200,6 +208,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
if (pQueryHandle->statis == NULL) {
goto out_of_memory;
}
pQueryHandle->pColumns = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
if (pQueryHandle->pColumns == NULL) {
goto out_of_memory;
......@@ -221,8 +230,9 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
if (pQueryHandle->pTableCheckInfo == NULL) {
goto out_of_memory;
}
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
assert(pMeta != NULL && sizeOfGroup >= 1 && pCond != NULL && pCond->numOfCols > 0);
for (int32_t i = 0; i < sizeOfGroup; ++i) {
SArray* group = *(SArray**) taosArrayGet(groupList->pGroupList, i);
......@@ -231,19 +241,29 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
assert(gsize > 0);
for (int32_t j = 0; j < gsize; ++j) {
STable* pTable = (STable*) taosArrayGetP(group, j);
STableKeyInfo* pKeyInfo = (STableKeyInfo*) taosArrayGet(group, j);
STableCheckInfo info = {
.lastKey = pQueryHandle->window.skey,
//.tableId = pTable->tableId,
.pTableObj = pTable,
.lastKey = pKeyInfo->lastKey,
.tableId = ((STable*)(pKeyInfo->pTable))->tableId,
.pTableObj = pKeyInfo->pTable,
};
info.tableId = pTable->tableId;
assert(info.pTableObj != NULL && (info.pTableObj->type == TSDB_NORMAL_TABLE ||
info.pTableObj->type == TSDB_CHILD_TABLE || info.pTableObj->type == TSDB_STREAM_TABLE));
info.tableId.tid = info.pTableObj->tableId.tid;
info.tableId.uid = info.pTableObj->tableId.uid;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(info.lastKey >= pQueryHandle->window.skey);
} else {
assert(info.lastKey <= pQueryHandle->window.skey);
}
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
tsdbDebug("%p check table uid:%"PRId64", tid:%d from lastKey:%"PRId64" %p", pQueryHandle, info.tableId.uid,
info.tableId.tid, info.lastKey, qinfo);
}
}
......@@ -316,17 +336,23 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
// TODO: add uid check
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables &&
pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
STableData* pMem = NULL;
STableData* pIMem = NULL;
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables) {
pMem = pHandle->mem->tData[pCheckInfo->tableId.tid];
if (pMem != NULL && pMem->uid == pCheckInfo->tableId.uid) { // check uid
pCheckInfo->iter =
tSkipListCreateIterFromVal(pMem->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
}
}
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables &&
pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables) {
pIMem = pHandle->imem->tData[pCheckInfo->tableId.tid];
if (pIMem != NULL && pIMem->uid == pCheckInfo->tableId.uid) { // check uid
pCheckInfo->iiter =
tSkipListCreateIterFromVal(pIMem->pData, (const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
}
}
// both iterators are NULL, no data in buffer right now
......@@ -346,8 +372,17 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in mem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", %p",
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pMem->keyFirst, pMem->keyLast,
pCheckInfo->lastKey, pHandle->qinfo);
if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key);
} else {
assert(pCheckInfo->lastKey >= key);
}
} else {
tsdbDebug("%p uid:%"PRId64", tid:%d no data in mem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo);
......@@ -359,8 +394,16 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
TSKEY key = dataRowKey(row); // first timestamp in buffer
tsdbDebug("%p uid:%" PRId64", tid:%d check data in imem from skey:%" PRId64 ", order:%d, %p", pHandle,
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pHandle->qinfo);
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
"-%" PRId64 ", lastKey:%" PRId64 ", %p",
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pIMem->keyFirst, pIMem->keyLast,
pCheckInfo->lastKey, pHandle->qinfo);
if (ASCENDING_TRAVERSE(order)) {
assert(pCheckInfo->lastKey <= key);
} else {
assert(pCheckInfo->lastKey >= key);
}
} else {
tsdbDebug("%p uid:%"PRId64", tid:%d no data in imem, %p", pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid,
pHandle->qinfo);
......@@ -1072,6 +1115,14 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
TSKEY* tsArray = pCols->cols[0].pData;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
TSKEY s = tsArray[cur->pos];
assert(s >= pQueryHandle->window.skey && s <= pQueryHandle->window.ekey);
} else {
TSKEY s = tsArray[cur->pos];
assert(s <= pQueryHandle->window.skey && s >= pQueryHandle->window.ekey);
}
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
......@@ -1551,7 +1602,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
// current block is done, try next
if (!cur->mixBlock || cur->blockCompleted) {
if ((!cur->mixBlock) || cur->blockCompleted) {
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// all data blocks in current file has been checked already, try next file if exists
......@@ -1570,6 +1621,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
return TSDB_CODE_SUCCESS;
}
} else {
tsdbDebug("%p continue in current data block, index:%d, %p", pQueryHandle, cur->slot, pQueryHandle->qinfo);
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
*exists = pQueryHandle->realNumOfRows > 0;
......@@ -2032,7 +2084,9 @@ static int32_t getAllTableList(STable* pSuperTable, SArray* list) {
SSkipListNode* pNode = tSkipListIterGet(iter);
STable** pTable = (STable**) SL_GET_NODE_DATA((SSkipListNode*) pNode);
taosArrayPush(list, pTable);
STableKeyInfo info = {.pTable = *pTable, .lastKey = TSKEY_INITIAL_VAL};
taosArrayPush(list, &info);
}
tSkipListDestroyIter(iter);
......@@ -2070,7 +2124,7 @@ void filterPrepare(void* expr, void* param) {
pInfo->sch = *pSchema;
pInfo->optr = pExpr->_node.optr;
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
pInfo->param = pTSSchema;
pInfo->indexed = pTSSchema->columns->colId == pInfo->sch.colId;
if (pInfo->optr == TSDB_RELATION_IN) {
pInfo->q = (char*) pCond->arr;
......@@ -2088,8 +2142,8 @@ typedef struct STableGroupSupporter {
int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
STableGroupSupporter* pTableGroupSupp = (STableGroupSupporter*) param;
STable* pTable1 = *(STable**) p1;
STable* pTable2 = *(STable**) p2;
STable* pTable1 = ((STableKeyInfo*) p1)->pTable;
STable* pTable2 = ((STableKeyInfo*) p2)->pTable;
for (int32_t i = 0; i < pTableGroupSupp->numOfCols; ++i) {
SColIndex* pColIndex = &pTableGroupSupp->pCols[i];
......@@ -2139,12 +2193,14 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) {
return 0;
}
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, STableGroupSupporter* pSupp,
void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTables, TSKEY skey, STableGroupSupporter* pSupp,
__ext_compar_fn_t compareFn) {
STable* pTable = taosArrayGetP(pTableList, 0);
SArray* g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, &pTable);
SArray* g = taosArrayInit(16, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = pTable, .lastKey = skey};
taosArrayPush(g, &info);
tsdbRefTable(pTable);
for (int32_t i = 1; i < numOfTables; ++i) {
......@@ -2158,18 +2214,21 @@ void createTableGroupImpl(SArray* pGroups, SArray* pTableList, size_t numOfTable
assert((*p)->type == TSDB_CHILD_TABLE);
if (ret == 0) {
taosArrayPush(g, p);
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
taosArrayPush(g, &info1);
} else {
taosArrayPush(pGroups, &g); // current group is ended, start a new group
g = taosArrayInit(16, POINTER_BYTES);
taosArrayPush(g, p);
g = taosArrayInit(16, sizeof(STableKeyInfo));
STableKeyInfo info1 = {.pTable = *p, .lastKey = skey};
taosArrayPush(g, &info1);
}
}
taosArrayPush(pGroups, &g);
}
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols) {
SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pCols, int32_t numOfOrderCols, TSKEY skey) {
assert(pTableList != NULL);
SArray* pTableGroup = taosArrayInit(1, POINTER_BYTES);
......@@ -2180,13 +2239,16 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
}
if (numOfOrderCols == 0 || size == 1) { // no group by tags clause or only one table
SArray* sa = taosArrayInit(size, POINTER_BYTES);
SArray* sa = taosArrayInit(size, sizeof(STableKeyInfo));
for(int32_t i = 0; i < size; ++i) {
STable** pTable = taosArrayGet(pTableList, i);
assert((*pTable)->type == TSDB_CHILD_TABLE);
STableKeyInfo *pKeyInfo = taosArrayGet(pTableList, i);
assert(((STable*)pKeyInfo->pTable)->type == TSDB_CHILD_TABLE);
tsdbRefTable(*pTable);
taosArrayPush(sa, pTable);
tsdbRefTable(pKeyInfo->pTable);
STableKeyInfo info = {.pTable = pKeyInfo->pTable, .lastKey = skey};
taosArrayPush(sa, &info);
}
taosArrayPush(pTableGroup, &sa);
......@@ -2197,8 +2259,8 @@ SArray* createTableGroup(SArray* pTableList, STSchema* pTagSchema, SColIndex* pC
pSupp->pTagSchema = pTagSchema;
pSupp->pCols = pCols;
taosqsort(pTableList->pData, size, POINTER_BYTES, pSupp, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, pSupp, tableGroupComparFn);
taosqsort(pTableList->pData, size, sizeof(STableKeyInfo), pSupp, tableGroupComparFn);
createTableGroupImpl(pTableGroup, pTableList, size, skey, pSupp, tableGroupComparFn);
taosTFree(pSupp);
}
......@@ -2271,7 +2333,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
return TSDB_CODE_SUCCESS;
}
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pTagCond, size_t len,
int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
......@@ -2295,7 +2357,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
}
//NOTE: not add ref count for super table
SArray* res = taosArrayInit(8, POINTER_BYTES);
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
STSchema* pTagSchema = tsdbGetTableTagSchema(pTable);
// no tags and tbname condition, all child tables of this stable are involved
......@@ -2307,7 +2369,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
}
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
tsdbDebug("%p no table name/tag condition, all tables belong to one group, numOfTables:%" PRIzu "", tsdb, pGroupInfo->numOfTables);
taosArrayDestroy(res);
......@@ -2350,7 +2412,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
doQueryTableList(pTable, res, expr);
pGroupInfo->numOfTables = taosArrayGetSize(res);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
tsdbDebug("%p stable tid:%d, uid:%"PRIu64" query, numOfTables:%" PRIzu ", belong to %" PRIzu " groups", tsdb, pTable->tableId.tid,
pTable->tableId.uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
......@@ -2364,7 +2426,7 @@ int32_t tsdbQuerySTableByTagCond(TSDB_REPO_T* tsdb, uint64_t uid, const char* pT
return terrno;
}
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* pGroupInfo) {
int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
if (tsdbRLockRepoMeta(tsdb) < 0) goto _error;
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(tsdb), uid);
......@@ -2381,9 +2443,11 @@ int32_t tsdbGetOneTableGroup(TSDB_REPO_T* tsdb, uint64_t uid, STableGroupInfo* p
pGroupInfo->numOfTables = 1;
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
STableKeyInfo info = {.pTable = pTable, .lastKey = startKey};
taosArrayPush(group, &info);
taosArrayPush(group, &pTable);
taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS;
......@@ -2400,7 +2464,7 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
assert(pTableIdList != NULL);
size_t size = taosArrayGetSize(pTableIdList);
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, POINTER_BYTES);
SArray* group = taosArrayInit(1, sizeof(STableKeyInfo));
int32_t i = 0;
for(; i < size; ++i) {
......@@ -2418,7 +2482,9 @@ int32_t tsdbGetTableGroupFromIdList(TSDB_REPO_T* tsdb, SArray* pTableIdList, STa
}
tsdbRefTable(pTable);
taosArrayPush(group, &pTable);
STableKeyInfo info = {.pTable = pTable, .lastKey = id->key};
taosArrayPush(group, &info);
}
if (tsdbUnlockRepoMeta(tsdb) < 0) {
......
......@@ -20,7 +20,9 @@
extern "C" {
#endif
#include "tarray.h"
#include "hashfunc.h"
#include "tlockfree.h"
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
......@@ -29,38 +31,45 @@ extern "C" {
typedef void (*_hash_free_fn_t)(void *param);
typedef struct SHashNode {
char *key;
// union {
struct SHashNode * prev;
// struct SHashEntry *prev1;
// };
//
char *key;
// struct SHashNode *prev;
struct SHashNode *next;
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t keyLen; // length of the key
char data[];
char *data;
} SHashNode;
typedef enum SHashLockTypeE {
HASH_NO_LOCK = 0,
HASH_ENTRY_LOCK = 1,
} SHashLockTypeE;
typedef struct SHashEntry {
int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch
SHashNode *next;
} SHashEntry;
typedef struct SHashObj {
SHashNode **hashList;
size_t capacity; // number of slots
size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function
#if defined(LINUX)
pthread_rwlock_t *lock;
#else
pthread_mutex_t *lock;
#endif
SHashEntry **hashList;
size_t capacity; // number of slots
size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function
SRWLatch lock; // read-write spin lock
SHashLockTypeE type; // lock type
bool enableUpdate; // enable update
SArray *pMemBlock; // memory block allocated for SHashEntry
} SHashObj;
typedef struct SHashMutableIterator {
SHashObj * pHashObj;
SHashObj *pHashObj;
int32_t entryIndex;
SHashNode *pCur;
SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current
int32_t num; // already check number of elements in hash table
SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current
size_t numOfChecked; // already check number of elements in hash table
size_t numOfEntries; // number of entries while the iterator is created
} SHashMutableIterator;
/**
......@@ -71,7 +80,7 @@ typedef struct SHashMutableIterator {
* @param threadsafe thread safe or not
* @return
*/
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe);
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type);
/**
* return the size of hash table
......@@ -101,13 +110,19 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
*/
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void(*fp)(void*));
/**
* remove item with the specified key
* @param pHashObj
* @param key
* @param keyLen
*/
void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize);
int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param);
/**
* clean up hash table
......@@ -115,13 +130,6 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
*/
void taosHashCleanup(SHashObj *pHashObj);
/**
* Set the free callback function
* This function if set will be invoked right before freeing each hash node
* @param pHashObj
*/
void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp);
/**
*
* @param pHashObj
......
......@@ -121,7 +121,7 @@ void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen
* @param expireTime new expire time of data
* @return
*/
void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime);
//void* taosCacheUpdateExpireTimeByName(SCacheObj *pCacheObj, void *key, size_t keyLen, uint64_t expireTime);
/**
* Add one reference count for the exist data, and assign this data for a new owner.
......
......@@ -24,8 +24,6 @@ extern "C" {
#include "tutil.h"
#include "ttokendef.h"
#define TSQL_TBNAME "TBNAME"
#define TSQL_TBNAME_L "tbname"
......
此差异已折叠。
此差异已折叠。
......@@ -433,7 +433,7 @@ static SKVStore *tdNewKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void
pStore->iFunc = iFunc;
pStore->aFunc = aFunc;
pStore->appH = appH;
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
pStore->map = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pStore->map == NULL) {
terrno = TSDB_CODE_COM_OUT_OF_MEMORY;
goto _err;
......
......@@ -10,6 +10,6 @@ IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR)
INCLUDE_DIRECTORIES(${HEADER_GTEST_INCLUDE_DIR})
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
ADD_EXECUTABLE(utilTest ./cacheTest.cpp ./hashTest.cpp)
TARGET_LINK_LIBRARIES(utilTest tutil common osdetail gtest pthread gcov)
ENDIF()
......@@ -10,7 +10,7 @@
namespace {
// the simple test code for basic operations
void simpleTest() {
auto* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
ASSERT_EQ(taosHashGetSize(hashTable), 0);
// put 400 elements in the hash table
......@@ -47,7 +47,7 @@ void simpleTest() {
}
void stringKeyTest() {
auto* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
auto* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
ASSERT_EQ(taosHashGetSize(hashTable), 0);
char key[128] = {0};
......@@ -97,7 +97,7 @@ void functionTest() {
* a single threads situation
*/
void noLockPerformanceTest() {
auto* hashTable = (SHashObj*) taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
auto* hashTable = (SHashObj*) taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
ASSERT_EQ(taosHashGetSize(hashTable), 0);
char key[128] = {0};
......
......@@ -50,7 +50,7 @@ int32_t vnodeInitResources() {
vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true);
tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (tsDnodeVnodesHash == NULL) {
vError("failed to init vnode list");
return TSDB_CODE_VND_OUT_OF_MEMORY;
......@@ -251,10 +251,17 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
appH.cqCreateFunc = cqCreate;
appH.cqDropFunc = cqDrop;
sprintf(temp, "%s/tsdb", rootDir);
terrno = 0;
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
if (pVnode->tsdb == NULL) {
vnodeCleanUp(pVnode);
return terrno;
} else if (terrno != 0 && pVnode->syncCfg.replica <= 1) {
vError("vgId:%d, failed to open tsdb, replica:%d reason:%s", pVnode->vgId, pVnode->syncCfg.replica,
tstrerror(terrno));
vnodeCleanUp(pVnode);
return terrno;
}
sprintf(temp, "%s/wal", rootDir);
......
......@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include <dnode.h>
//#include <dnode.h>
#include "os.h"
#include "tglobal.h"
......@@ -66,7 +66,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
}
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
......@@ -74,22 +74,23 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
pRead->rpcMsg.handle = NULL;
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void* handle, bool* freeHandle) {
static int32_t vnodeDumpQueryResult(SRspRet *pRet, void* pVnode, void** handle, bool* freeHandle) {
bool continueExec = false;
int32_t code = TSDB_CODE_SUCCESS;
if ((code = qDumpRetrieveResult(handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == TSDB_CODE_SUCCESS) {
if (continueExec) {
vDebug("QInfo:%p add to query task queue for exec", handle);
vnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = handle;
*freeHandle = false;
vnodePutItemIntoReadQueue(pVnode, handle);
pRet->qhandle = *handle;
} else {
vDebug("QInfo:%p exec completed", handle);
*freeHandle = true;
vDebug("QInfo:%p exec completed, free handle:%d", *handle, *freeHandle);
}
} else {
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
......@@ -158,7 +159,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
// current connect is broken
if (code == TSDB_CODE_SUCCESS) {
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
if (handle == NULL) { // failed to register qhandle
if (handle == NULL) { // failed to register qhandle, todo add error test case
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
tstrerror(pRsp->code));
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
......@@ -179,41 +180,40 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
}
if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
vnodePutItemIntoReadQueue(pVnode, *handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
vnodePutItemIntoReadQueue(pVnode, handle);
}
} else {
assert(pCont != NULL);
void** qhandle = (void**) pCont;
handle = qAcquireQInfo(pVnode->qMgmt, (uint64_t) pCont);
if (handle == NULL) {
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_QRY_INVALID_QHANDLE;
} else {
vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, *qhandle);
bool freehandle = false;
bool buildRes = qTableQuery(*handle); // do execute query
bool freehandle = false;
bool buildRes = qTableQuery(*qhandle); // do execute query
// build query rsp
if (buildRes) {
// update the connection info according to the retrieve connection
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
assert(pReadMsg->rpcMsg.handle != NULL);
// build query rsp, the retrieve request has reached here already
if (buildRes) {
// update the connection info according to the retrieve connection
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*qhandle);
assert(pReadMsg->rpcMsg.handle != NULL);
vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle,
pReadMsg->rpcMsg.handle);
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, qhandle, &freehandle);
// todo test the error code case
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_HAS_RSP;
}
// todo test the error code case
if (code == TSDB_CODE_SUCCESS) {
code = TSDB_CODE_QRY_HAS_RSP;
}
} else {
freehandle = qQueryCompleted(*qhandle);
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freehandle);
// NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle.
// if not build result, free it not by forced.
if (freehandle || (!buildRes)) {
qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle);
}
}
......@@ -225,8 +225,8 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
SRspRet *pRet = &pReadMsg->rspRet;
SRetrieveTableMsg *pRetrieve = pCont;
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
pRetrieve->free = htons(pRetrieve->free);
pRetrieve->qhandle = htobe64(pRetrieve->qhandle);
vDebug("vgId:%d, QInfo:%p, retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, (void*) pRetrieve->qhandle, pRetrieve->free, pReadMsg->rpcMsg.handle);
......@@ -236,18 +236,27 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qhandle);
if (handle == NULL || (*handle) != (void*) pRetrieve->qhandle) {
code = TSDB_CODE_QRY_INVALID_QHANDLE;
vDebug("vgId:%d, invalid qhandle in fetch result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
vDebug("vgId:%d, invalid qhandle in retrieving result, QInfo:%p", pVnode->vgId, (void*) pRetrieve->qhandle);
vnodeBuildNoResultQueryRsp(pRet);
return code;
}
if (pRetrieve->free == 1) {
vDebug("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
vnodeBuildNoResultQueryRsp(pRet);
code = TSDB_CODE_TSC_QUERY_CANCELLED;
return code;
}
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%p, retrieve discarded since link is broken, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
return code;
}
......@@ -259,16 +268,22 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
//TODO handle malloc failure
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
freeHandle = true;
} else { // result is not ready, return immediately
if (!buildRes) {
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
return TSDB_CODE_QRY_NOT_READY;
}
code = vnodeDumpQueryResult(pRet, pVnode, *handle, &freeHandle);
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle);
}
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
// Here free qhandle immediately
if (freeHandle) {
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, true);
}
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freeHandle);
return code;
}
......
......@@ -63,7 +63,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>1.0.1</version>
<version>2.0.2</version>
</dependency>
</dependencies>
</project>
......@@ -41,7 +41,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>1.0.3</version>
<version>2.0.2</version>
</dependency>
</dependencies>
......
......@@ -63,7 +63,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>1.0.3</version>
<version>2.0.2</version>
</dependency>
<dependency>
......
#!/bin/bash
DATA_DIR=/mnt/root/testdata
NUM_LOOP=1
NUM_OF_FILES=100
rowsPerRequest=(1 100 500 1000 2000)
function printTo {
if $verbose ; then
echo $1
fi
}
function runTest {
printf "R/R, "
for c in `seq 1 $clients`; do
if [ "$c" == "1" ]; then
printf "$c client, "
else
printf "$c clients, "
fi
done
printf "\n"
for r in ${rowsPerRequest[@]}; do
printf "$r, "
for c in `seq 1 $clients`; do
totalRPR=0
OUTPUT_FILE=influxdbTestWrite-RPR$r-clients$c.out
for i in `seq 1 $NUM_LOOP`; do
printTo "loop i:$i, $INF_TEST_DIR/influxdbTest \
-dataDir $DATA_DIR \
-numOfFiles $NUM_OF_FILES \
-writeClients $c \
-rowsPerRequest $r"
$INF_TEST_DIR/influxdbTest \
-dataDir $DATA_DIR \
-numOfFiles $NUM_OF_FILES \
-writeClients $c \
-rowsPerRequest $r 2>&1 \
| tee $OUTPUT_FILE
RPR=`cat $OUTPUT_FILE | grep speed | awk '{print $(NF-1)}'`
totalRPR=`echo "scale=4; $totalRPR + $RPR" | bc`
printTo "rows:$r, clients:$c, i:$i RPR:$RPR"
done
avgRPR=`echo "scale=4; $totalRPR / $NUM_LOOP" | bc`
printf "$avgRPR, "
done
printf "\n"
done
}
################ Main ################
verbose=false
clients=1
while : ; do
case $1 in
-v)
verbose=true
shift ;;
-n)
NUM_LOOP=$2
shift 2;;
-c)
clients=$2
shift 2;;
*)
break ;;
esac
done
WORK_DIR=/mnt/root/TDengine
INF_TEST_DIR=$WORK_DIR/tests/comparisonTest/influxdb
runTest
echo "Test done!"
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c debugFlag -v 135
system sh/cfg.sh -n dnode1 -c rpcDebugFlag -v 135
system sh/exec.sh -n dnode1 -s start
......@@ -28,7 +28,7 @@ $mt = $mtPrefix . $i
sql drop database if exits $db -x step1
step1:
sql create database if not exists $db tables 4 keep 36500
sql create database if not exists $db maxtables 4 keep 36500
sql use $db
sql create table $mt (ts timestamp, c1 int, c2 float, c3 bigint, c4 smallint, c5 tinyint, c6 double, c7 bool, c8 binary(10), c9 nchar(9)) TAGS(t1 int, t2 binary(12))
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册