提交 48c0805f 编写于 作者: Y yihaoDeng

Merge branch 'develop' of https://github.com/taosdata/TDengine into develop

......@@ -179,19 +179,20 @@ taos> select avg(f1), max(f2), min(f3) from test.t10 interval(10s);
### TDengine服务器支持的平台列表
| | **CentOS 6/7/8** | **Ubuntu 16/18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** |
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- |
| X64 | ● | ● | | ○ | ● | ● |
| 树莓派 ARM32 | | ● | ● | | | |
| 龙芯 MIPS64 | | | ● | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | |
| 申威 Alpha64 | | | ○ | ● | | |
| 飞腾 ARM64 | | ○ 优麒麟 | | | | |
| 海光 X64 | ● | ● | ● | ○ | ● | ● |
| 瑞芯微 ARM64/32 | | | ○ | | | |
| 全志 ARM64/32 | | | ○ | | | |
| 炬力 ARM64/32 | | | ○ | | | |
| TI ARM32 | | | ○ | | | |
| | **CentOS 6/7/8** | **Ubuntu 16/18/20** | **Other Linux** | **统信 UOS** | **银河/中标麒麟** | **凝思 V60/V80** | **华为 EulerOS** |
| -------------- | --------------------- | ------------------------ | --------------- | --------------- | ------------------------- | --------------------- | --------------------- |
| X64 | ● | ● | | ○ | ● | ● | ● |
| 树莓派 ARM32 | | ● | ● | | | | |
| 龙芯 MIPS64 | | | ● | | | | |
| 鲲鹏 ARM64 | | ○ | ○ | | ● | | |
| 申威 Alpha64 | | | ○ | ● | | | |
| 飞腾 ARM64 | | ○ 优麒麟 | | | | | |
| 海光 X64 | ● | ● | ● | ○ | ● | ● | |
| 瑞芯微 ARM64/32 | | | ○ | | | | |
| 全志 ARM64/32 | | | ○ | | | | |
| 炬力 ARM64/32 | | | ○ | | | | |
| TI ARM32 | | | ○ | | | | |
| 华为云 ARM64 | | | | | | | ● |
注: ● 表示经过官方测试验证, ○ 表示非官方测试验证。
......
......@@ -301,7 +301,7 @@ static void tscAsyncResultCallback(SSchedMsg *pMsg) {
taosReleaseRef(tscObjRef, pSql->self);
}
void tscAsyncResultOnError(SSqlObj* pSql) {
void tscAsyncResultOnError(SSqlObj* pSql) {
SSchedMsg schedMsg = {0};
schedMsg.fp = tscAsyncResultCallback;
schedMsg.ahandle = (void *)pSql->self;
......@@ -505,10 +505,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
return;
_error:
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscAsyncResultOnError(pSql);
}
pRes->code = code;
tscAsyncResultOnError(pSql);
taosReleaseRef(tscObjRef, pSql->self);
}
......@@ -3293,7 +3293,8 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
}
if (pSchema->type == TSDB_DATA_TYPE_BOOL) {
if (pExpr->tokenId != TK_EQ && pExpr->tokenId != TK_NE) {
int32_t t = pExpr->tokenId;
if (t != TK_EQ && t != TK_NE && t != TK_NOTNULL && t != TK_ISNULL) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
......@@ -3493,7 +3494,8 @@ static int32_t validateSQLExpr(SSqlCmd* pCmd, tSqlExpr* pExpr, SQueryInfo* pQuer
}
pList->ids[pList->num++] = index;
} else if (pExpr->tokenId == TK_FLOAT && (isnan(pExpr->value.dKey) || isinf(pExpr->value.dKey))) {
} else if ((pExpr->tokenId == TK_FLOAT && (isnan(pExpr->value.dKey) || isinf(pExpr->value.dKey))) ||
pExpr->tokenId == TK_NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
} else if (pExpr->type == SQL_NODE_SQLFUNCTION) {
if (*type == NON_ARITHMEIC_EXPR) {
......@@ -3727,6 +3729,39 @@ static int32_t setExprToCond(tSqlExpr** parent, tSqlExpr* pExpr, const char* msg
return TSDB_CODE_SUCCESS;
}
static int32_t validateNullExpr(tSqlExpr* pExpr, char* msgBuf) {
const char* msg = "only support is [not] null";
tSqlExpr* pRight = pExpr->pRight;
if (pRight->tokenId == TK_NULL && (!(pExpr->tokenId == TK_ISNULL || pExpr->tokenId == TK_NOTNULL))) {
return invalidSqlErrMsg(msgBuf, msg);
}
return TSDB_CODE_SUCCESS;
}
// check for like expression
static int32_t validateLikeExpr(tSqlExpr* pExpr, STableMeta* pTableMeta, int32_t index, char* msgBuf) {
const char* msg1 = "wildcard string should be less than 20 characters";
const char* msg2 = "illegal column name";
tSqlExpr* pLeft = pExpr->pLeft;
tSqlExpr* pRight = pExpr->pRight;
if (pExpr->tokenId == TK_LIKE) {
if (pRight->value.nLen > TSDB_PATTERN_STRING_MAX_LEN) {
return invalidSqlErrMsg(msgBuf, msg1);
}
SSchema* pSchema = tscGetTableSchema(pTableMeta);
if ((!isTablenameToken(&pLeft->colInfo)) && !IS_VAR_DATA_TYPE(pSchema[index].type)) {
return invalidSqlErrMsg(msgBuf, msg2);
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExpr** pExpr, SCondExpr* pCondExpr,
int32_t* type, int32_t parentOptr) {
const char* msg1 = "table query cannot use tags filter";
......@@ -3736,8 +3771,7 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
const char* msg5 = "not support ordinary column join";
const char* msg6 = "only one query condition on tbname allowed";
const char* msg7 = "only in/like allowed in filter table name";
const char* msg8 = "wildcard string should be less than 20 characters";
tSqlExpr* pLeft = (*pExpr)->pLeft;
tSqlExpr* pRight = (*pExpr)->pRight;
......@@ -3753,6 +3787,18 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
// validate the null expression
int32_t code = validateNullExpr(*pExpr, tscGetErrorMsgPayload(pCmd));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// validate the like expression
code = validateLikeExpr(*pExpr, pTableMeta, index.columnIndex, tscGetErrorMsgPayload(pCmd));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (index.columnIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) { // query on time range
if (!validateJoinExprNode(pCmd, pQueryInfo, *pExpr, &index)) {
return TSDB_CODE_TSC_INVALID_SQL;
......@@ -3774,7 +3820,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
int16_t leftIdx = index.tableIndex;
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
if (getColumnIndexByName(pCmd, &pRight->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -3821,20 +3866,6 @@ static int32_t handleExprInQueryCond(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSql
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
// check for like expression
if ((*pExpr)->tokenId == TK_LIKE) {
if (pRight->value.nLen > TSDB_PATTERN_STRING_MAX_LEN) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg8);
}
SSchema* pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
if ((!isTablenameToken(&pLeft->colInfo)) && pSchema[index.columnIndex].type != TSDB_DATA_TYPE_BINARY &&
pSchema[index.columnIndex].type != TSDB_DATA_TYPE_NCHAR) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
}
// in case of in operator, keep it in a seprate attribute
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
if (!validTableNameOptr(*pExpr)) {
......
......@@ -309,7 +309,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
int32_t offset = 0;
for (int32_t i = 0; i < pRes->numOfCols; ++i) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
pRes->urow[i] = pRes->data + offset * pRes->numOfRows;
......
......@@ -238,9 +238,11 @@ static int32_t dnodeInitStorage() {
}
TDIR *tdir = tfsOpendir("vnode_bak/.staging");
if (tfsReaddir(tdir) != NULL) {
bool stagingNotEmpty = tfsReaddir(tdir) != NULL;
tfsClosedir(tdir);
if (stagingNotEmpty) {
dError("vnode_bak/.staging dir not empty, fix it first.");
tfsClosedir(tdir);
return -1;
}
......
......@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else {
if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead.version, 0, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
vnodeConfirmForward(pVnode, pWrite->pHead.version, pWrite->code, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
}
if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp);
......
此差异已折叠。
......@@ -86,7 +86,8 @@ typedef struct SResultRow {
bool closed; // this result status: closed or opened
uint32_t numOfRows; // number of rows of current time window
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
union {STimeWindow win; char* key;}; // start key of current result row
STimeWindow win;
char* key; // start key of current result row
} SResultRow;
typedef struct SGroupResInfo {
......
......@@ -675,6 +675,7 @@ expr(A) ::= STRING(X). { A = tSqlExprCreateIdValue(&X, TK_STRING);}
expr(A) ::= NOW(X). { A = tSqlExprCreateIdValue(&X, TK_NOW); }
expr(A) ::= VARIABLE(X). { A = tSqlExprCreateIdValue(&X, TK_VARIABLE);}
expr(A) ::= BOOL(X). { A = tSqlExprCreateIdValue(&X, TK_BOOL);}
expr(A) ::= NULL(X). { A = tSqlExprCreateIdValue(&X, TK_NULL);}
// ordinary functions: min(x), max(x), top(k, 20)
expr(A) ::= ID(X) LP exprlist(Y) RP(E). { A = tSqlExprCreateFunction(Y, &X, &E, X.type); }
......
......@@ -200,6 +200,7 @@ static bool isPointInterpoQuery(SQuery *pQuery);
static void setResultBufSize(SQuery* pQuery, SRspResultInfo* pResultInfo);
static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable);
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
int32_t groupIndex);
......@@ -1330,6 +1331,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
int16_t bytes = pColInfoData->info.bytes;
int16_t type = pColInfoData->info.type;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (type == TSDB_DATA_TYPE_FLOAT || type == TSDB_DATA_TYPE_DOUBLE) {
qError("QInfo:%"PRIu64" group by not supported on double/float columns, abort", GET_QID(pRuntimeEnv));
......@@ -1350,6 +1352,10 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
memcpy(pInfo->prevData, val, bytes);
if (pQuery->stableQuery && pQuery->stabledev && (pRuntimeEnv->prevResult != NULL)) {
setParamForStableStddevByColData(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, pOperator->pExpr, val, bytes);
}
int32_t ret =
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
......@@ -1870,14 +1876,15 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
taosHashCleanup(pRuntimeEnv->pResultRowHashTable);
pRuntimeEnv->pResultRowHashTable = NULL;
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
taosHashCleanup(pRuntimeEnv->pTableRetrieveTsMap);
pRuntimeEnv->pTableRetrieveTsMap = NULL;
destroyOperatorInfo(pRuntimeEnv->proot);
pRuntimeEnv->pool = destroyResultRowPool(pRuntimeEnv->pool);
taosArrayDestroyEx(pRuntimeEnv->prevResult, freeInterResult);
pRuntimeEnv->prevResult = NULL;
}
static bool needBuildResAfterQueryComplete(SQInfo* pQInfo) {
......@@ -3396,6 +3403,42 @@ void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
}
void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes) {
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t numOfExprs = pQuery->numOfOutput;
for(int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExprInfo = &(pExpr[i]);
if (pExprInfo->base.functionId != TSDB_FUNC_STDDEV_DST) {
continue;
}
SSqlFuncMsg* pFuncMsg = &pExprInfo->base;
pCtx[i].param[0].arr = NULL;
pCtx[i].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
// TODO use hash to speedup this loop
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
for (int32_t j = 0; j < numOfGroup; ++j) {
SInterResult* p = taosArrayGet(pRuntimeEnv->prevResult, j);
if (bytes == 0 || memcmp(p->tags, val, bytes) == 0) {
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
for (int32_t k = 0; k < numOfCols; ++k) {
SStddevInterResult* pres = taosArrayGet(p->pResult, k);
if (pres->colId == pFuncMsg->colInfo.colId) {
pCtx[i].param[0].arr = pres->pResult;
break;
}
}
}
}
}
}
/*
* There are two cases to handle:
*
......@@ -6421,6 +6464,9 @@ void freeQInfo(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
releaseQueryBuf(pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
teardownQueryRuntimeEnv(&pQInfo->runtimeEnv);
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
......@@ -6456,7 +6502,6 @@ void freeQInfo(SQInfo *pQInfo) {
}
}
doDestroyTableQueryInfo(&pRuntimeEnv->tableqinfoGroupInfo);
tfree(pQInfo->pBuf);
tfree(pQInfo->sql);
......
......@@ -127,7 +127,12 @@ tSqlExpr *tSqlExprCreateIdValue(SStrToken *pToken, int32_t optrType) {
pSqlExpr->token = *pToken;
}
if (optrType == TK_INTEGER || optrType == TK_STRING || optrType == TK_FLOAT || optrType == TK_BOOL) {
if (optrType == TK_NULL) {
pToken->type = TSDB_DATA_TYPE_NULL;
tVariantCreate(&pSqlExpr->value, pToken);
pSqlExpr->tokenId = optrType;
pSqlExpr->type = SQL_NODE_VALUE;
} else if (optrType == TK_INTEGER || optrType == TK_STRING || optrType == TK_FLOAT || optrType == TK_BOOL) {
toTSDBType(pToken->type);
tVariantCreate(&pSqlExpr->value, pToken);
......@@ -356,7 +361,11 @@ void tSqlExprCompact(tSqlExpr** pExpr) {
bool tSqlExprIsLeaf(tSqlExpr* pExpr) {
return (pExpr->pRight == NULL && pExpr->pLeft == NULL) &&
(pExpr->tokenId == 0 || pExpr->tokenId == TK_ID || (pExpr->tokenId >= TK_BOOL && pExpr->tokenId <= TK_NCHAR) || pExpr->tokenId == TK_SET);
(pExpr->tokenId == 0 ||
(pExpr->tokenId == TK_ID) ||
(pExpr->tokenId >= TK_BOOL && pExpr->tokenId <= TK_NCHAR) ||
(pExpr->tokenId == TK_NULL) ||
(pExpr->tokenId == TK_SET));
}
bool tSqlExprIsParentOfLeaf(tSqlExpr* pExpr) {
......
......@@ -66,8 +66,8 @@ void cleanupResultRowInfo(SResultRowInfo *pResultRowInfo) {
return;
}
if (pResultRowInfo->type == TSDB_DATA_TYPE_BINARY || pResultRowInfo->type == TSDB_DATA_TYPE_NCHAR) {
for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
for(int32_t i = 0; i < pResultRowInfo->size; ++i) {
if (pResultRowInfo->pResult[i]) {
tfree(pResultRowInfo->pResult[i]->key);
}
}
......@@ -153,11 +153,8 @@ void clearResultRow(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16
pResultRow->offset = -1;
pResultRow->closed = false;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
tfree(pResultRow->key);
} else {
pResultRow->win = TSWINDOW_INITIALIZER;
}
tfree(pResultRow->key);
pResultRow->win = TSWINDOW_INITIALIZER;
}
// TODO refactor: use macro
......
......@@ -372,6 +372,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("QInfo:%"PRIu64" query killed", pQInfo->qId);
setQueryKilled(pQInfo);
// Wait for the query executing thread being stopped/
......
此差异已折叠。
......@@ -35,7 +35,7 @@ extern "C" {
#define SYNC_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead) + sizeof(SSyncHead) + 16)
#define SYNC_RECV_BUFFER_SIZE (5*1024*1024)
#define SYNC_MAX_FWDS 512
#define SYNC_MAX_FWDS 1024
#define SYNC_FWD_TIMER 300
#define SYNC_ROLE_TIMER 15000 // ms
#define SYNC_CHECK_INTERVAL 1000 // ms
......
......@@ -1459,7 +1459,12 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
if ((pNode->quorum > 1 || force) && code == 0) {
code = syncSaveFwdInfo(pNode, pWalHead->version, mhandle);
if (code >= 0) code = 1;
if (code >= 0) {
code = 1;
} else {
pthread_mutex_unlock(&pNode->mutex);
return code;
}
}
int32_t retLen = taosWriteMsg(pPeer->peerFd, pSyncHead, fwdLen);
......
......@@ -364,7 +364,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, conn:%p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
qKillQuery(*handle);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
......@@ -409,7 +409,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
// client is broken, the query needs to be killed immediately.
int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) {
SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
pMsg->qhandle = htobe64((uint64_t)qhandle);
pMsg->qId = htobe64(qId);
pMsg->header.vgId = htonl(vgId);
pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
......
......@@ -91,13 +91,17 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
int32_t syncCode = 0;
bool force = (pWrite == NULL ? false : pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT);
syncCode = syncForwardToPeer(pVnode->sync, pHead, pWrite, qtype, force);
if (syncCode < 0) return syncCode;
if (syncCode < 0) {
pHead->version = 0;
return syncCode;
}
// write into WAL
code = walWrite(pVnode->wal, pHead);
if (code < 0) {
if (syncCode > 0) atomic_sub_fetch_32(&pWrite->processedCount, 1);
vError("vgId:%d, hver:%" PRIu64 " vver:%" PRIu64 " code:0x%x", pVnode->vgId, pHead->version, pVnode->version, code);
pHead->version = 0;
return code;
}
......
......@@ -16,45 +16,47 @@ package main
import (
"database/sql"
"flag"
"fmt"
_ "github.com/taosdata/driver-go/taosSql"
"math/rand"
"os"
"sync"
"runtime"
"strconv"
"sync"
"time"
"flag"
"math/rand"
_ "github.com/taosdata/driver-go/taosSql"
//"golang.org/x/sys/unix"
)
const (
maxLocationSize = 32
maxSqlBufSize = 65480
maxLocationSize = 32
//maxSqlBufSize = 65480
)
var locations = [maxLocationSize]string {
"Beijing", "Shanghai", "Guangzhou", "Shenzhen",
"HangZhou", "Tianjin", "Wuhan", "Changsha",
"Nanjing", "Xian"}
var locations = [maxLocationSize]string{
"Beijing", "Shanghai", "Guangzhou", "Shenzhen",
"HangZhou", "Tianjin", "Wuhan", "Changsha",
"Nanjing", "Xian"}
type config struct {
hostName string
serverPort int
user string
password string
dbName string
supTblName string
tablePrefix string
numOftables int
numOfRecordsPerTable int
numOfRecordsPerReq int
numOfThreads int
startTimestamp string
startTs int64
keep int
days int
hostName string
serverPort int
user string
password string
dbName string
supTblName string
tablePrefix string
numOftables int
numOfRecordsPerTable int
numOfRecordsPerReq int
numOfThreads int
startTimestamp string
startTs int64
keep int
days int
}
var configPara config
......@@ -62,7 +64,7 @@ var taosDriverName = "taosSql"
var url string
func init() {
flag.StringVar(&configPara.hostName, "h", "127.0.0.1","The host to connect to TDengine server.")
flag.StringVar(&configPara.hostName, "h", "127.0.0.1", "The host to connect to TDengine server.")
flag.IntVar(&configPara.serverPort, "p", 6030, "The TCP/IP port number to use for the connection to TDengine server.")
flag.StringVar(&configPara.user, "u", "root", "The TDengine user name to use when connecting to the server.")
flag.StringVar(&configPara.password, "P", "taosdata", "The password to use when connecting to the server.")
......@@ -80,14 +82,14 @@ func init() {
configPara.supTblName = "meters"
startTs, err := time.ParseInLocation("2006-01-02 15:04:05", configPara.startTimestamp, time.Local)
if err==nil {
configPara.startTs = startTs.UnixNano() / 1e6
if err == nil {
configPara.startTs = startTs.UnixNano() / 1e6
}
}
func printAllArgs() {
fmt.Printf("\n============= args parse result: =============\n")
fmt.Printf("hostName: %v\n", configPara.hostName)
fmt.Printf("hostName: %v\n", configPara.hostName)
fmt.Printf("serverPort: %v\n", configPara.serverPort)
fmt.Printf("usr: %v\n", configPara.user)
fmt.Printf("password: %v\n", configPara.password)
......@@ -104,10 +106,10 @@ func printAllArgs() {
func main() {
printAllArgs()
fmt.Printf("Please press enter key to continue....\n")
fmt.Scanln()
_, _ = fmt.Scanln()
url = "root:taosdata@/tcp(" + configPara.hostName + ":" + strconv.Itoa(configPara.serverPort) + ")/"
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
//url = fmt.Sprintf("%s:%s@/tcp(%s:%d)/%s?interpolateParams=true", configPara.user, configPara.password, configPara.hostName, configPara.serverPort, configPara.dbName)
// open connect to taos server
//db, err := sql.Open(taosDriverName, url)
//if err != nil {
......@@ -115,7 +117,7 @@ func main() {
// os.Exit(1)
//}
//defer db.Close()
rand.Seed(time.Now().Unix())
rand.Seed(time.Now().Unix())
createDatabase(configPara.dbName, configPara.supTblName)
fmt.Printf("======== create database success! ========\n\n")
......@@ -138,7 +140,7 @@ func main() {
func createDatabase(dbName string, supTblName string) {
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
......@@ -165,27 +167,27 @@ func createDatabase(dbName string, supTblName string) {
checkErr(err, sqlStr)
}
func multiThreadCreateTable(threads int, ntables int, dbName string, tablePrefix string) {
func multiThreadCreateTable(threads int, nTables int, dbName string, tablePrefix string) {
st := time.Now().UnixNano()
if (threads < 1) {
threads = 1;
if threads < 1 {
threads = 1
}
a := ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
a := nTables / threads
if a < 1 {
threads = nTables
a = 1
}
b := ntables % threads;
b := nTables % threads
last := 0;
last := 0
endTblId := 0
wg := sync.WaitGroup{}
for i := 0; i < threads; i++ {
startTblId := last
if (i < b ) {
if i < b {
endTblId = last + a
} else {
endTblId = last + a - 1
......@@ -206,42 +208,43 @@ func createTable(dbName string, childTblPrefix string, startTblId int, endTblId
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
for i := startTblId; i <= endTblId; i++ {
sqlStr := "create table if not exists " + dbName + "." + childTblPrefix + strconv.Itoa(i) + " using " + dbName + ".meters tags('" + locations[i%maxLocationSize] + "', " + strconv.Itoa(i) + ");"
//fmt.Printf("sqlStr: %v\n", sqlStr)
_, err = db.Exec(sqlStr)
checkErr(err, sqlStr)
}
wg.Done()
runtime.Goexit()
for i := startTblId; i <= endTblId; i++ {
sqlStr := "create table if not exists " + dbName + "." + childTblPrefix + strconv.Itoa(i) + " using " + dbName + ".meters tags('" + locations[i%maxLocationSize] + "', " + strconv.Itoa(i) + ");"
//fmt.Printf("sqlStr: %v\n", sqlStr)
_, err = db.Exec(sqlStr)
checkErr(err, sqlStr)
}
wg.Done()
runtime.Goexit()
}
func generateRowData(ts int64) string {
voltage := rand.Int() % 1000
current := 200 + rand.Float32()
phase := rand.Float32()
values := "( " + strconv.FormatInt(ts, 10) + ", " + strconv.FormatFloat(float64(current), 'f', 6, 64) + ", " + strconv.Itoa(voltage) + ", " + strconv.FormatFloat(float64(phase), 'f', 6, 64) + " ) "
return values
voltage := rand.Int() % 1000
current := 200 + rand.Float32()
phase := rand.Float32()
values := "( " + strconv.FormatInt(ts, 10) + ", " + strconv.FormatFloat(float64(current), 'f', 6, 64) + ", " + strconv.Itoa(voltage) + ", " + strconv.FormatFloat(float64(phase), 'f', 6, 64) + " ) "
return values
}
func insertData(dbName string, childTblPrefix string, startTblId int, endTblId int, wg *sync.WaitGroup) {
//fmt.Printf("subThread[%d]: insert data to table from %d to %d \n", unix.Gettid(), startTblId, endTblId)
// windows.GetCurrentThreadId()
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
tmpTs := configPara.startTs;
tmpTs := configPara.startTs
//rand.New(rand.NewSource(time.Now().UnixNano()))
for tID := startTblId; tID <= endTblId; tID++{
for tID := startTblId; tID <= endTblId; tID++ {
totalNum := 0
for {
sqlStr := "insert into " + dbName + "." + childTblPrefix + strconv.Itoa(tID) + " values "
......@@ -249,13 +252,13 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
for {
tmpTs += 1000
valuesOfRow := generateRowData(tmpTs)
currRowNum += 1
totalNum += 1
currRowNum += 1
totalNum += 1
sqlStr = fmt.Sprintf("%s %s", sqlStr, valuesOfRow)
if (currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable) {
break
if currRowNum >= configPara.numOfRecordsPerReq || totalNum >= configPara.numOfRecordsPerTable {
break
}
}
......@@ -265,12 +268,12 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
count, err := res.RowsAffected()
checkErr(err, "rows affected")
if (count != int64(currRowNum)) {
fmt.Printf("insert data, expect affected:%d, actual:%d\n", currRowNum, count)
if count != int64(currRowNum) {
fmt.Printf("insert data, expect affected:%d, actual:%d\n", currRowNum, count)
os.Exit(1)
}
if (totalNum >= configPara.numOfRecordsPerTable) {
if totalNum >= configPara.numOfRecordsPerTable {
break
}
}
......@@ -279,44 +282,46 @@ func insertData(dbName string, childTblPrefix string, startTblId int, endTblId i
wg.Done()
runtime.Goexit()
}
func multiThreadInsertData(threads int, ntables int, dbName string, tablePrefix string) {
func multiThreadInsertData(threads int, nTables int, dbName string, tablePrefix string) {
st := time.Now().UnixNano()
if (threads < 1) {
threads = 1;
if threads < 1 {
threads = 1
}
a := ntables / threads;
if (a < 1) {
threads = ntables;
a = 1;
a := nTables / threads
if a < 1 {
threads = nTables
a = 1
}
b := ntables % threads;
b := nTables % threads
last := 0;
last := 0
endTblId := 0
wg := sync.WaitGroup{}
for i := 0; i < threads; i++ {
startTblId := last
if (i < b ) {
if i < b {
endTblId = last + a
} else {
endTblId = last + a - 1
}
last = endTblId + 1
wg.Add(1)
go insertData(dbName, tablePrefix, startTblId , endTblId, &wg)
go insertData(dbName, tablePrefix, startTblId, endTblId, &wg)
}
wg.Wait()
et := time.Now().UnixNano()
fmt.Printf("insert data spent duration: %6.6fs\n", (float32(et-st))/1e9)
}
func selectTest(dbName string, tbPrefix string, supTblName string){
func selectTest(dbName string, tbPrefix string, supTblName string) {
db, err := sql.Open(taosDriverName, url)
if err != nil {
fmt.Println("Open database error: %s\n", err)
fmt.Printf("Open database error: %s\n", err)
os.Exit(1)
}
defer db.Close()
......@@ -332,12 +337,12 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
fmt.Printf("query sql: %s\n", sqlStr)
for rows.Next() {
var (
ts string
current float32
voltage int
phase float32
location string
groupid int
ts string
current float32
voltage int
phase float32
location string
groupid int
)
err := rows.Scan(&ts, &current, &voltage, &phase, &location, &groupid)
if err != nil {
......@@ -352,7 +357,7 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
}
// select sql 2
sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa( rand.Int() % configPara.numOftables)
sqlStr = "select avg(voltage), min(voltage), max(voltage) from " + dbName + "." + tbPrefix + strconv.Itoa(rand.Int()%configPara.numOftables)
rows, err = db.Query(sqlStr)
checkErr(err, sqlStr)
......@@ -360,9 +365,9 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
fmt.Printf("\nquery sql: %s\n", sqlStr)
for rows.Next() {
var (
voltageAvg float32
voltageMin int
voltageMax int
voltageAvg float32
voltageMin int
voltageMax int
)
err := rows.Scan(&voltageAvg, &voltageMin, &voltageMax)
if err != nil {
......@@ -385,10 +390,10 @@ func selectTest(dbName string, tbPrefix string, supTblName string){
fmt.Printf("\nquery sql: %s\n", sqlStr)
for rows.Next() {
var (
lastTs string
lastCurrent float32
lastVoltage int
lastPhase float32
lastTs string
lastCurrent float32
lastVoltage int
lastPhase float32
)
err := rows.Scan(&lastTs, &lastCurrent, &lastVoltage, &lastPhase)
if err != nil {
......
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 0,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}, {"type": "DOUBLE", "count":10}, {"type": "BINARY", "len": 16, "count":3}, {"type": "BINARY", "len": 32, "count":6}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":5}]
}]
}]
}
{
"filetype": "insert",
"cfgdir": "/etc/taos",
"host": "127.0.0.1",
"port": 6030,
"user": "root",
"password": "taosdata",
"thread_count": 4,
"thread_count_create_tbl": 4,
"result_file": "./insert_res.txt",
"confirm_parameter_prompt": "no",
"insert_interval": 0,
"num_of_records_per_req": 100,
"max_sql_len": 1024000,
"databases": [{
"dbinfo": {
"name": "db",
"drop": "no",
"replica": 1,
"days": 10,
"cache": 16,
"blocks": 8,
"precision": "ms",
"keep": 365,
"minRows": 100,
"maxRows": 4096,
"comp":2,
"walLevel":1,
"cachelast":0,
"quorum":1,
"fsync":3000,
"update": 0
},
"super_tables": [{
"name": "stb",
"child_table_exists":"yes",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
"data_source": "rand",
"insert_mode": "taosc",
"insert_rows": 1000,
"childtable_limit": 33,
"childtable_offset": 33,
"multi_thread_write_one_tbl": "no",
"number_of_tbl_in_one_sql": 0,
"max_sql_len": 1024000,
"disorder_ratio": 0,
"disorder_range": 1000,
"timestamp_step": 1,
"start_timestamp": "2020-10-01 00:00:00.000",
"sample_format": "csv",
"sample_file": "./sample.csv",
"tags_file": "",
"columns": [{"type": "INT"}, {"type": "DOUBLE", "count":10}, {"type": "BINARY", "len": 16, "count":3}, {"type": "BINARY", "len": 32, "count":6}],
"tags": [{"type": "TINYINT", "count":2}, {"type": "BINARY", "len": 16, "count":5}]
}]
}]
}
......@@ -15,7 +15,7 @@
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"drop": "no",
"replica": 1,
"days": 10,
"cache": 16,
......@@ -33,7 +33,7 @@
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"child_table_exists":"yes",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
......
......@@ -15,7 +15,7 @@
"databases": [{
"dbinfo": {
"name": "db",
"drop": "yes",
"drop": "no",
"replica": 1,
"days": 10,
"cache": 16,
......@@ -33,7 +33,7 @@
},
"super_tables": [{
"name": "stb",
"child_table_exists":"no",
"child_table_exists":"yes",
"childtable_count": 100,
"childtable_prefix": "stb_",
"auto_create_table": "no",
......
......@@ -16,8 +16,6 @@
"name": "stb",
"child_table_exists":"no",
"childtable_count": 20,
"childtable_limit": 10,
"childtable_offset": 0,
"childtable_prefix": "t_",
"auto_create_table": "no",
"data_source": "sample",
......
......@@ -51,7 +51,8 @@ class TDTestCase:
else:
tdLog.info("taosd found in %s" % buildPath)
binPath = buildPath+ "/build/bin/"
os.system("%staosdemo -f tools/insert-tblimit-tboffset.json" % binPath)
os.system("%staosdemo -f tools/insert-tblimit-tboffset-createdb.json" % binPath)
os.system("%staosdemo -f tools/insert-tblimit-tboffset-insertrec.json" % binPath)
tdSql.execute("use db")
tdSql.query("select count(tbname) from db.stb")
......@@ -59,6 +60,7 @@ class TDTestCase:
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 33000)
os.system("%staosdemo -f tools/insert-tblimit-tboffset-createdb.json" % binPath)
os.system("%staosdemo -f tools/insert-tblimit-tboffset0.json" % binPath)
tdSql.execute("reset query cache")
......@@ -68,6 +70,7 @@ class TDTestCase:
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 20000)
os.system("%staosdemo -f tools/insert-tblimit-tboffset-createdb.json" % binPath)
os.system("%staosdemo -f tools/insert-tblimit1-tboffset.json" % binPath)
tdSql.execute("reset query cache")
......
......@@ -57,7 +57,7 @@ class TDTestCase:
tdSql.query("select count(tbname) from db.stb")
tdSql.checkData(0, 0, 20)
tdSql.query("select count(*) from db.stb")
tdSql.checkData(0, 0, 200)
tdSql.checkData(0, 0, 400)
def stop(self):
tdSql.close()
......
......@@ -325,4 +325,56 @@ if $row != 0 then
return -1
endi
print ===============================>td-3621
sql create table ttm2(ts timestamp, k bool);
sql insert into ttm2 values('2021-1-1 1:1:1', true)
sql insert into ttm2 values('2021-1-1 1:1:2', NULL)
sql insert into ttm2 values('2021-1-1 1:1:3', false)
sql select * from ttm2 where k is not null
if $row != 2 then
return -1
endi
if $data00 != @21-01-01 01:01:01.000@ then
print expect 21-01-01 01:01:01.000, actual $data00
return -1
endi
sql select * from ttm2 where k is null
if $row != 1 then
return -1
endi
if $data00 != @21-01-01 01:01:02.000@ then
return -1
endi
sql select * from ttm2 where k=true
if $row != 1 then
return -1
endi
if $data00 != @21-01-01 01:01:01.000@ then
return -1
endi
sql select * from ttm2 where k=false
if $row != 1 then
return -1
endi
if $data00 != @21-01-01 01:01:03.000@ then
return -1
endi
sql select * from ttm2 where k<>false
if $row != 1 then
return -1
endi
sql_error select * from ttm2 where k=null
sql_error select * from ttm2 where k<>null
sql_error select * from ttm2 where k like null
sql_error select * from ttm2 where k<null
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册