提交 3752a81f 编写于 作者: S Shengliang Guan

merge from master into develop

......@@ -82,6 +82,10 @@ tests/comparisonTest/opentsdb/opentsdbtest/.settings/
tests/examples/JDBC/JDBCDemo/.classpath
tests/examples/JDBC/JDBCDemo/.project
tests/examples/JDBC/JDBCDemo/.settings/
tests/script/api/batchprepare
tests/script/api/stmt
tests/script/api/stmtBatchTest
tests/script/api/stmtTest
# Emacs
# -*- mode: gitignore; -*-
......
......@@ -251,6 +251,77 @@ def pre_test_mac(){
'''
return 1
}
def pre_test_noinstall(){
sh'hostname'
sh'''
cd ${WKC}
git reset --hard HEAD~10 >/dev/null
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WKC}
git checkout master
'''
}
else if(env.CHANGE_TARGET == '2.0'){
sh '''
cd ${WKC}
git checkout 2.0
'''
}
else{
sh '''
cd ${WKC}
git checkout develop
'''
}
}
sh'''
cd ${WKC}
git pull >/dev/null
git fetch origin +refs/pull/${CHANGE_ID}/merge
git checkout -qf FETCH_HEAD
git clean -dfx
git submodule update --init --recursive
cd ${WK}
git reset --hard HEAD~10
'''
script {
if (env.CHANGE_TARGET == 'master') {
sh '''
cd ${WK}
git checkout master
'''
}
else if(env.CHANGE_TARGET == '2.0'){
sh '''
cd ${WK}
git checkout 2.0
'''
}
else{
sh '''
cd ${WK}
git checkout develop
'''
}
}
sh '''
cd ${WK}
git pull >/dev/null
export TZ=Asia/Harbin
date
git clean -dfx
mkdir debug
cd debug
cmake .. > /dev/null
make
'''
return 1
}
def pre_test_win(){
bat '''
taskkill /f /t /im python.exe
......@@ -429,7 +500,7 @@ pipeline {
stage('python_3_s6') {
agent{label " slave6 || slave16 "}
steps {
timeout(time: 55, unit: 'MINUTES'){
timeout(time: 65, unit: 'MINUTES'){
pre_test()
sh '''
date
......@@ -575,7 +646,7 @@ pipeline {
stage('test_b6_s9') {
agent{label " slave9 || slave19 "}
steps {
timeout(time: 55, unit: 'MINUTES'){
timeout(time: 105, unit: 'MINUTES'){
pre_test()
sh '''
date
......
......@@ -194,6 +194,7 @@ fi
if [[ "$dbName" == "pro" ]]; then
sed -i "s/taos config/prodb config/g" ${top_dir}/src/util/src/tconfig.c
sed -i "s/TDengine/ProDB/g" ${top_dir}/src/dnode/src/dnodeSystem.c
fi
echo "build ${pagMode} package ..."
......
......@@ -119,7 +119,8 @@ typedef struct SBlockKeyInfo {
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta);
int32_t tscCreateDataBlockData(STableDataBlocks* dataBuf, size_t defaultSize, int32_t rowSize, int32_t startOffset);
void tscDestroyDataBlock(SSqlObj *pSql, STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRowsRaw(STableDataBlocks* dataBuf);
int tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf, SBlockKeyInfo* pBlkKeyInfo);
int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows);
......
......@@ -237,7 +237,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
return;
}
if (pRes->qId == 0) {
if (pRes->qId == 0 && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
tscError("qhandle is invalid");
pRes->code = TSDB_CODE_TSC_INVALID_QHANDLE;
tscAsyncResultOnError(pSql);
......
......@@ -48,12 +48,14 @@ typedef struct SMultiTbStmt {
bool nameSet;
bool tagSet;
bool subSet;
bool tagColSet;
uint64_t currentUid;
char *sqlstr;
uint32_t tbNum;
SStrToken tbname;
SStrToken stbname;
SStrToken values;
SStrToken tagCols;
SArray *tags;
STableDataBlocks *lastBlock;
SHashObj *pTableHash;
......@@ -1250,6 +1252,12 @@ static void insertBatchClean(STscStmt* pStmt) {
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pSql, pCmd->insertParam.pDataBlocks);
pCmd->insertParam.numOfTables = 0;
STableDataBlocks** p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, NULL);
while(p) {
tfree((*p)->pData);
p = taosHashIterate(pCmd->insertParam.pTableBlockHashList, p);
}
taosHashClear(pCmd->insertParam.pTableBlockHashList);
tscFreeSqlResult(pSql);
tscFreeSubobj(pSql);
......@@ -1343,9 +1351,40 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
pStmt->mtb.stbname = sToken;
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_TAGS) {
tscError("keyword TAGS expected, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z ? sToken.z : pCmd->insertParam.sql);
if (sToken.n <= 0 || ((sToken.type != TK_TAGS) && (sToken.type != TK_LP))) {
tscError("invalid token, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "invalid token", sToken.z ? sToken.z : pCmd->insertParam.sql);
}
// ... (tag_col_list) TAGS(tag_val_list) ...
int32_t tagColsCnt = 0;
if (sToken.type == TK_LP) {
pStmt->mtb.tagColSet = true;
pStmt->mtb.tagCols = sToken;
int32_t tagColsStart = index;
while (1) {
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.type == TK_ILLEGAL) {
return tscSQLSyntaxErrMsg(pCmd->payload, "unrecognized token", sToken.z);
}
if (sToken.type == TK_ID) {
++tagColsCnt;
}
if (sToken.type == TK_RP) {
break;
}
}
if (tagColsCnt == 0) {
tscError("tag column list expected, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "tag column list expected", pCmd->insertParam.sql);
}
pStmt->mtb.tagCols.n = index - tagColsStart + 1;
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || sToken.type != TK_TAGS) {
tscError("keyword TAGS expected, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "keyword TAGS expected", sToken.z ? sToken.z : pCmd->insertParam.sql);
}
}
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
......@@ -1385,6 +1424,11 @@ int stmtParseInsertTbTags(SSqlObj* pSql, STscStmt* pStmt) {
return tscSQLSyntaxErrMsg(pCmd->payload, "no tags", pCmd->insertParam.sql);
}
if (tagColsCnt > 0 && taosArrayGetSize(pStmt->mtb.tags) != tagColsCnt) {
tscError("not match tags, sql:%s", pCmd->insertParam.sql);
return tscSQLSyntaxErrMsg(pCmd->payload, "not match tags", pCmd->insertParam.sql);
}
sToken = tStrGetToken(pCmd->insertParam.sql, &index, false);
if (sToken.n <= 0 || (sToken.type != TK_VALUES && sToken.type != TK_LP)) {
tscError("sql error, sql:%s", pCmd->insertParam.sql);
......@@ -1407,7 +1451,13 @@ int stmtGenInsertStatement(SSqlObj* pSql, STscStmt* pStmt, const char* name, TAO
int32_t j = 0;
while (1) {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s tags(", name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z);
if (pStmt->mtb.tagColSet) {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s %.*s tags(",
name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z, pStmt->mtb.tagCols.n, pStmt->mtb.tagCols.z);
} else {
len = (size_t)snprintf(str, size - 1, "insert into %s using %.*s tags(", name, pStmt->mtb.stbname.n, pStmt->mtb.stbname.z);
}
if (len >= (size -1)) {
size *= 2;
free(str);
......@@ -1659,6 +1709,13 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET(TSDB_CODE_TSC_APP_ERROR);
}
if ((*t1)->pData == NULL) {
code = tscCreateDataBlockData(*t1, TSDB_PAYLOAD_SIZE, (*t1)->pTableMeta->tableInfo.rowSize, sizeof(SSubmitBlk));
if (code != TSDB_CODE_SUCCESS) {
STMT_RET(code);
}
}
SSubmitBlk* pBlk = (SSubmitBlk*) (*t1)->pData;
pCmd->batchSize = pBlk->numOfRows;
if (pBlk->numOfRows == 0) {
......@@ -1784,7 +1841,6 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
STMT_RET(code);
}
int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
......@@ -1792,8 +1848,6 @@ int taos_stmt_set_sub_tbname(TAOS_STMT* stmt, const char* name) {
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
......@@ -1801,7 +1855,6 @@ int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name) {
return taos_stmt_set_tbname_tags(stmt, name, NULL);
}
int taos_stmt_close(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
if (pStmt == NULL || pStmt->taos == NULL) {
......@@ -1868,7 +1921,6 @@ int taos_stmt_bind_param(TAOS_STMT* stmt, TAOS_BIND* bind) {
}
}
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -1932,8 +1984,6 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind, in
STMT_RET(insertStmtBindParamBatch(pStmt, bind, colIdx));
}
int taos_stmt_add_batch(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
STMT_CHECK
......@@ -2086,7 +2136,6 @@ int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
}
}
char *taos_stmt_errstr(TAOS_STMT *stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -2097,8 +2146,6 @@ char *taos_stmt_errstr(TAOS_STMT *stmt) {
return taos_errstr(pStmt->pSql);
}
const char *taos_data_type(int type) {
switch (type) {
case TSDB_DATA_TYPE_NULL: return "TSDB_DATA_TYPE_NULL";
......@@ -2115,4 +2162,3 @@ const char *taos_data_type(int type) {
default: return "UNKNOWN";
}
}
......@@ -468,7 +468,7 @@ SSqlObj* recreateSqlObj(SSub* pSub) {
}
registerSqlObj(pSql);
pSql->rootObj = pSql;
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
tsem_wait(&pSub->sem);
......
......@@ -187,7 +187,7 @@ bool tscQueryTags(SQueryInfo* pQueryInfo) {
continue;
}
if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG) {
if (functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_TID_TAG && functId != TSDB_FUNC_BLKINFO) {
return false;
}
}
......@@ -1845,6 +1845,32 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
int32_t code = tscCreateDataBlockData(dataBuf, defaultSize, rowSize, startOffset);
if (code != TSDB_CODE_SUCCESS) {
tfree(dataBuf);
return code;
}
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = tscGetTableSchema(dataBuf->pTableMeta);
tscSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
int32_t tscCreateDataBlockData(STableDataBlocks* dataBuf, size_t defaultSize, int32_t rowSize, int32_t startOffset) {
assert(dataBuf != NULL);
dataBuf->nAllocSize = (uint32_t)defaultSize;
dataBuf->headerSize = startOffset;
......@@ -1857,30 +1883,16 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff
dataBuf->pData = malloc(dataBuf->nAllocSize);
if (dataBuf->pData == NULL) {
tscError("failed to allocated memory, reason:%s", strerror(errno));
tfree(dataBuf);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
memset(dataBuf->pData, 0, sizeof(SSubmitBlk));
//Here we keep the tableMeta to avoid it to be remove by other threads.
dataBuf->pTableMeta = tscTableMetaDup(pTableMeta);
SParsedDataColInfo* pColInfo = &dataBuf->boundColumnInfo;
SSchema* pSchema = tscGetTableSchema(dataBuf->pTableMeta);
tscSetBoundColumnInfo(pColInfo, pSchema, dataBuf->pTableMeta->tableInfo.numOfColumns);
dataBuf->ordered = true;
dataBuf->prevTS = INT64_MIN;
dataBuf->rowSize = rowSize;
dataBuf->size = startOffset;
dataBuf->tsSource = -1;
dataBuf->vgId = dataBuf->pTableMeta->vgId;
tNameAssign(&dataBuf->tableName, name);
assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);
*dataBlocks = dataBuf;
return TSDB_CODE_SUCCESS;
}
......
......@@ -253,9 +253,10 @@ int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPo
}
if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1;
if (numOfRows > 0) {
if (((rowOffset == 0) && (numOfRows > 0)) || ((rowOffset == -1) && (numOfRows >= 0))) {
// Find the first not null value, fill all previouse values as NULL
dataColSetNEleNull(pCol, numOfRows);
dataColSetNEleNull(pCol, numOfRows - rowOffset);
}
}
......@@ -463,9 +464,7 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
int rcol = 0;
int dcol = 0;
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
......@@ -476,14 +475,22 @@ static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols
STColumn *pRowCol = schemaColAt(pSchema, rcol);
if (pRowCol->colId == pDataCol->colId) {
void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
if (rowOffset == 0) {
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
} else if (rowOffset == -1) {
// for update 2
if (!isNull(value, pDataCol->type)) {
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
}
} else {
ASSERT(0);
}
dcol++;
rcol++;
} else if (pRowCol->colId < pDataCol->colId) {
rcol++;
} else {
if(forceSetNull || setCol) {
if(forceSetNull) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
}
dcol++;
......@@ -501,7 +508,6 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
int nRowCols = kvRowNCols(row);
while (dcol < pCols->numOfCols) {
bool setCol = 0;
SDataCol *pDataCol = &(pCols->cols[dcol]);
if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
......@@ -513,14 +519,22 @@ static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCo
if (colIdx->colId == pDataCol->colId) {
void *value = tdGetKvRowDataOfCol(row, colIdx->offset);
if(!isNull(value, pDataCol->type)) setCol = 1;
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
if (rowOffset == 0) {
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
} else if (rowOffset == -1) {
// for update 2
if (!isNull(value, pDataCol->type)) {
dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints, rowOffset);
}
} else {
ASSERT(0);
}
++dcol;
++rcol;
} else if (colIdx->colId < pDataCol->colId) {
++rcol;
} else {
if(forceSetNull || setCol) {
if (forceSetNull) {
dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints, rowOffset);
}
++dcol;
......
......@@ -27,6 +27,9 @@
#include "tulog.h"
#include "tutil.h"
// TSDB
bool tsdbForceKeepFile = false;
// cluster
char tsFirst[TSDB_EP_LEN] = {0};
char tsSecond[TSDB_EP_LEN] = {0};
......
......@@ -5,6 +5,7 @@ import com.taosdata.jdbc.TSDBErrorNumbers;
import org.apache.http.HeaderElement;
import org.apache.http.HeaderElementIterator;
import org.apache.http.HttpEntity;
import org.apache.http.NoHttpResponseException;
import org.apache.http.client.ClientProtocolException;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.*;
......
......@@ -1139,7 +1139,7 @@ static int32_t mnodeAlterDbFp(SMnodeMsg *pMsg) {
return sdbUpdateRow(&row);
}
//bnNotify();
bnNotify();
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
}
......
......@@ -121,7 +121,7 @@ static int32_t mnodeProcessShowMsg(SMnodeMsg *pMsg) {
}
if (!tsMnodeShowMetaFp[pShowMsg->type] || !tsMnodeShowRetrieveFp[pShowMsg->type]) {
mError("show type:%s is not support", mnodeGetShowType(pShowMsg->type));
mWarn("show type:%s is not support", mnodeGetShowType(pShowMsg->type));
return TSDB_CODE_COM_OPS_NOT_SUPPORT;
}
......
......@@ -742,19 +742,6 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
return 0;
}
static bool mnodeFilterVgroups(SVgObj *pVgroup, STableObj *pTable) {
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
return true;
}
SCTableObj *pCTable = (SCTableObj *)pTable;
if (pVgroup->vgId == pCTable->vgId) {
return true;
} else {
return false;
}
}
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
......@@ -770,11 +757,6 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
return 0;
}
STableObj *pTable = NULL;
if (pShow->payloadLen > 0 ) {
pTable = mnodeGetTable(pShow->payload);
}
while (numOfRows < rows) {
pShow->pIter = mnodeGetNextVgroup(pShow->pIter, &pVgroup);
if (pVgroup == NULL) break;
......@@ -784,11 +766,6 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
continue;
}
if (!mnodeFilterVgroups(pVgroup, pTable)) {
mnodeDecVgroupRef(pVgroup);
continue;
}
cols = 0;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -842,7 +819,6 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
pShow->numOfReads += numOfRows;
mnodeDecTableRef(pTable);
mnodeDecDbRef(pDb);
return numOfRows;
......
......@@ -424,29 +424,44 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec
}
} //end switch fromPrecision
end_:
if (tempResult > (double)INT64_MAX) return INT64_MAX;
if (tempResult < (double)INT64_MIN) return INT64_MIN + 1; // INT64_MIN means NULL
if (tempResult >= (double)INT64_MAX) return INT64_MAX;
if (tempResult <= (double)INT64_MIN) return INT64_MIN + 1; // INT64_MIN means NULL
return time;
}
static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
switch (unit) {
case 's':
case 's':{
double temp = ((double)val) * MILLISECOND_PER_SECOND;
if (temp >= (double)INT64_MAX || temp <= (double)INT64_MIN) return -1;
(*result) = convertTimePrecision(val * MILLISECOND_PER_SECOND, TSDB_TIME_PRECISION_MILLI, timePrecision);
break;
case 'm':
}
case 'm':{
double temp = ((double)val) * MILLISECOND_PER_MINUTE;
if (temp >= (double)INT64_MAX || temp <= (double)INT64_MIN) return -1;
(*result) = convertTimePrecision(val * MILLISECOND_PER_MINUTE, TSDB_TIME_PRECISION_MILLI, timePrecision);
break;
case 'h':
}
case 'h':{
double temp = ((double)val) * MILLISECOND_PER_HOUR;
if (temp >= (double)INT64_MAX || temp <= (double)INT64_MIN) return -1;
(*result) = convertTimePrecision(val * MILLISECOND_PER_HOUR, TSDB_TIME_PRECISION_MILLI, timePrecision);
break;
case 'd':
}
case 'd': {
double temp = ((double)val) * MILLISECOND_PER_DAY;
if (temp >= (double)INT64_MAX || temp <= (double)INT64_MIN) return -1;
(*result) = convertTimePrecision(val * MILLISECOND_PER_DAY, TSDB_TIME_PRECISION_MILLI, timePrecision);
break;
case 'w':
}
case 'w': {
double temp = ((double)val) * MILLISECOND_PER_WEEK;
if (temp >= (double)INT64_MAX || temp <= (double)INT64_MIN) return -1;
(*result) = convertTimePrecision(val * MILLISECOND_PER_WEEK, TSDB_TIME_PRECISION_MILLI, timePrecision);
break;
}
case 'a':
(*result) = convertTimePrecision(val, TSDB_TIME_PRECISION_MILLI, timePrecision);
break;
......
......@@ -127,12 +127,6 @@ cmd ::= SHOW dbPrefix(X) VGROUPS. {
setShowOptions(pInfo, TSDB_MGMT_TABLE_VGROUP, &token, 0);
}
cmd ::= SHOW dbPrefix(X) VGROUPS ids(Y). {
SStrToken token;
tSetDbName(&token, &X);
setShowOptions(pInfo, TSDB_MGMT_TABLE_VGROUP, &token, &Y);
}
//drop configure for tables
cmd ::= DROP TABLE ifexists(Y) ids(X) cpxName(Z). {
X.n += Z.n;
......
......@@ -821,7 +821,7 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, STimeWindow* w, int32_
if (pInfo->hasResult != DATA_SET_FLAG) {
return BLK_DATA_ALL_NEEDED;
} else {
return (pInfo->ts > w->ekey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
return (pInfo->ts >= w->ekey) ? BLK_DATA_NO_NEEDED : BLK_DATA_ALL_NEEDED;
}
}
......@@ -4459,7 +4459,7 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
"5th=[%d], 10th=[%d], 20th=[%d], 30th=[%d], 40th=[%d], 50th=[%d]\n\t "
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.2f]\n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], SmallBlocks=[%d], Size=[%.3f(Kb)] Comp=[%.5g]\n\t "
"RowsInMem=[%d] \n\t",
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5],
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
......
......@@ -342,7 +342,7 @@ int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg) {
tsdbError(
"vgId:%d failed to update tag value of table %s since version out of date, client tag version %d server tag "
"version %d",
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->tagSchema));
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), pMsg->tversion, schemaVersion(pTable->pSuper->tagSchema));
terrno = TSDB_CODE_TDB_TAG_VER_OUT_OF_DATE;
return -1;
}
......
......@@ -1377,66 +1377,63 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
return code;
}
static int doBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
int firstPos, lastPos, midPos = -1;
int numOfRows;
TSKEY* keyList;
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
if (num <= 0) return -1;
keyList = (TSKEY*)pValue;
firstPos = 0;
lastPos = num - 1;
if (order == TSDB_ORDER_DESC) {
// search last keyList[ret] < key order asc and keyList[ret] > key order desc
static int doBinarySearchKey(TSKEY* keyList, int num, int pos, TSKEY key, int order) {
// start end posistion
int s, e;
s = pos;
// check
assert(pos >=0 && pos < num);
assert(num > 0);
if (order == TSDB_ORDER_ASC) {
// find the first position which is smaller than the key
e = num - 1;
if (key < keyList[pos])
return -1;
while (1) {
if (key >= keyList[lastPos]) return lastPos;
if (key == keyList[firstPos]) return firstPos;
if (key < keyList[firstPos]) return firstPos - 1;
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
}
} else {
// check can return
if (key >= keyList[e])
return e;
if (key <= keyList[s])
return s;
if (e - s <= 1)
return s;
// change start or end position
int mid = s + (e - s + 1)/2;
if (keyList[mid] > key)
e = mid;
else if(keyList[mid] < key)
s = mid;
else
return mid;
}
} else { // DESC
// find the first position which is bigger than the key
while (1) {
if (key <= keyList[firstPos]) return firstPos;
if (key == keyList[lastPos]) return lastPos;
if (key > keyList[lastPos]) {
lastPos = lastPos + 1;
if (lastPos >= num)
return -1;
e = 0;
if (key > keyList[pos])
return -1;
while (1) {
// check can return
if (key <= keyList[e])
return e;
if (key >= keyList[s])
return s;
if (s - e <= 1)
return s;
// change start or end position
int mid = s - (s - e + 1)/2;
if (keyList[mid] < key)
e = mid;
else if(keyList[mid] > key)
s = mid;
else
return lastPos;
}
numOfRows = lastPos - firstPos + 1;
midPos = (numOfRows >> 1) + firstPos;
if (key < keyList[midPos]) {
lastPos = midPos - 1;
} else if (key > keyList[midPos]) {
firstPos = midPos + 1;
} else {
break;
}
return mid;
}
}
}
return midPos;
}
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end) {
......@@ -1844,7 +1841,6 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) {
// NOTE: reverse the order to find the end position in data block
int32_t endPos = -1;
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
SQueryFilePos* cur = &pQueryHandle->cur;
SDataCols* pCols = pQueryHandle->rhelper.pDCols[0];
......@@ -1857,7 +1853,9 @@ int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBl
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
} else {
assert(pCols->numOfRows > 0);
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
int pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0 : pBlockInfo->rows - 1;
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, pQueryHandle->window.ekey, pQueryHandle->order);
assert(endPos != -1);
cur->mixBlock = true;
}
......@@ -1877,17 +1875,16 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
assert(pCols->cols[0].type == TSDB_DATA_TYPE_TIMESTAMP && pCols->cols[0].colId == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
cur->pos >= 0 && cur->pos < pBlock->numOfRows);
TSKEY* tsArray = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && tsArray[0] == pBlock->keyFirst && tsArray[pBlock->numOfRows-1] == pBlock->keyLast);
// for search the endPos, so the order needs to reverse
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
// key read from file
TSKEY* keyFile = pCols->cols[0].pData;
assert(pCols->numOfRows == pBlock->numOfRows && keyFile[0] == pBlock->keyFirst && keyFile[pBlock->numOfRows-1] == pBlock->keyLast);
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
STable* pTable = pCheckInfo->pTableObj;
int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo);
tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
"end:%d, 0x%"PRIx64,
......@@ -1902,6 +1899,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
STSchema* pSchema1 = NULL;
STSchema* pSchema2 = NULL;
// position in file ->fpos
int32_t pos = cur->pos;
cur->win = TSWINDOW_INITIALIZER;
......@@ -1924,9 +1922,13 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
break;
}
if (((pos > endPos || tsArray[pos] > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
((pos < endPos || tsArray[pos] < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
break;
// break if pos not in this block endPos range. note old code when pos is -1 can crash.
if(ASCENDING_TRAVERSE(pQueryHandle->order)) { //ASC
if(pos > endPos || keyFile[pos] > pQueryHandle->window.ekey)
break;
} else { //DESC
if(pos < endPos || keyFile[pos] < pQueryHandle->window.ekey)
break;
}
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
......@@ -1942,16 +1944,18 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, true);
numOfRows += 1;
// record start key with memory key if not
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
cur->win.skey = keyMem;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->win.ekey = keyMem;
cur->lastKey = keyMem + step;
cur->mixBlock = true;
moveToNextRowInMem(pCheckInfo);
} else if (key == tsArray[pos]) { // data in buffer has the same timestamp of data in file block, ignore it
// same select mem key if update is true
} else if (keyMem == keyFile[pos]) {
if (pCfg->update) {
if(pCfg->update == TD_ROW_PARTIAL_UPDATE) {
doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, pos, pos);
......@@ -1969,31 +1973,36 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
mergeTwoRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row1, row2, numOfCols, pTable, pSchema1, pSchema2, forceSetNull);
numOfRows += 1;
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = key;
cur->win.skey = keyMem;
}
cur->win.ekey = key;
cur->lastKey = key + step;
cur->win.ekey = keyMem;
cur->lastKey = keyMem + step;
cur->mixBlock = true;
//mem move next
moveToNextRowInMem(pCheckInfo);
//file move next, discard file row
pos += step;
} else {
// not update, only mem move to next, discard mem row
moveToNextRowInMem(pCheckInfo);
}
} else if ((key > tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(key < tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
// put file row
} else if ((keyMem > keyFile[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
(keyMem < keyFile[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
cur->win.skey = keyFile[pos];
}
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, key, order);
int32_t end = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pos, keyMem, pQueryHandle->order);
assert(end != -1);
if (tsArray[end] == key) { // the value of key in cache equals to the end timestamp value, ignore it
if (pCfg->update == TD_ROW_DISCARD_UPDATE) {
moveToNextRowInMem(pCheckInfo);
} else {
// can update, don't copy then deal on next loop with keyMem == keyFile[pos]
end -= step;
}
}
......@@ -2001,10 +2010,17 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
int32_t qstart = 0, qend = 0;
getQualifiedRowsPos(pQueryHandle, pos, end, numOfRows, &qstart, &qend);
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
pos += (qend - qstart + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[qend]:tsArray[qstart];
if(qend >= qstart) {
// copy qend - qstart + 1 rows from file
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, qstart, qend);
int32_t num = qend - qstart + 1;
pos += num * step;
} else {
// nothing copy from file
pos += step;
}
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[qend] : keyFile[qstart];
cur->lastKey = cur->win.ekey + step;
}
} while (numOfRows < pQueryHandle->outputCapacity);
......@@ -2021,7 +2037,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
!ASCENDING_TRAVERSE(pQueryHandle->order))) {
// no data in cache or data in cache is greater than the ekey of time window, load data from file block
if (cur->win.skey == TSKEY_INITIAL_VAL) {
cur->win.skey = tsArray[pos];
cur->win.skey = keyFile[pos];
}
int32_t start = -1, end = -1;
......@@ -2030,7 +2046,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
numOfRows = doCopyRowsFromFileBlock(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, start, end);
pos += (end - start + 1) * step;
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? tsArray[end]:tsArray[start];
cur->win.ekey = ASCENDING_TRAVERSE(pQueryHandle->order)? keyFile[end] : keyFile[start];
cur->lastKey = cur->win.ekey + step;
cur->mixBlock = true;
}
......@@ -2946,6 +2962,9 @@ static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) {
// handle data in cache situation
bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
if (pQueryHandle == NULL) {
return false;
}
if (emptyQueryTimewindow(pQueryHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId);
......@@ -3065,6 +3084,9 @@ static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SM
pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qId, pMemRef);
tfree(cond.colList);
if (pSecQueryHandle == NULL) {
goto out_of_memory;
}
// current table, only one table
STableCheckInfo* pCurrent = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
......
......@@ -279,6 +279,7 @@ python3 ./test.py -f query/queryCnameDisplay.py
# python3 ./test.py -f query/long_where_query.py
python3 test.py -f query/nestedQuery/queryWithSpread.py
python3 ./test.py -f query/bug6586.py
python3 ./test.py -f query/bug5903.py
#stream
python3 ./test.py -f stream/metric_1.py
......
......@@ -68,9 +68,9 @@ class TDTestCase:
tdSql.checkData(0, 0, "2018-09-17 09:00:10.000")
tdSql.checkData(0, 1, "2018-09-17 09:00:10.000")
tdSql.checkData(0, 3, "2018-09-17 09:00:10.000")
tdSql.checkData(1, 0, "2018-09-17 09:00:20.009")
tdSql.checkData(1, 1, "2018-09-17 09:00:20.009")
tdSql.checkData(1, 3, "2018-09-17 09:00:20.009")
tdSql.checkData(1, 0, "2018-09-17 09:00:20.000")
tdSql.checkData(1, 1, "2018-09-17 09:00:20.000")
tdSql.checkData(1, 3, "2018-09-17 09:00:20.000")
tdSql.query("select ts from(select ts,derivative(col, 10s, 0) from stb group by tbname)")
......@@ -150,6 +150,7 @@ class TDTestCase:
tdSql.error("select derivative(col, -106752999999999922222d, 0) from stb group by tbname"); #overflow error
tdSql.error("select derivative(col, 10y, 0) from stb group by tbname") #TD-10399, DB error: syntax error near '10y, 0) from stb group by tbname;'
tdSql.error("select derivative(col, -106752d, 0) from stb group by tbname") #TD-10398 overflow tips
tdSql.error("select derivative(col, 106751991168d, 0) from stb group by tbname") #TD-10398 overflow tips
def run(self):
tdSql.prepare()
......
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
def run(self):
# TD-5903 show db.vgroups xxx. xxx is invalid content, but still returns results.
tdSql.execute("create database if not exists test_5903")
tdSql.execute("show test_5903.vgroups")
tdSql.error("show test_5903.vgroups xxx")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())
此差异已折叠。
......@@ -178,6 +178,8 @@ class TDTestCase:
tdSql.checkRows(1)
self.tb193new = "table_193~!@#$%^&*()-_+=[]{}':,<.>/?stST0123456789table_192~!@#$%^&*()-_+=[]{}':,<.>/?stST0123456789table_192~!@#$%^&*()-_+=[]{}':,<.>/?stST0123456789table_192~!@#$%^&*()-_+=[]{}':,<.>/?stST123"
tdSql.error("create table db.`%s` using db.`%s` tags(1)" %(self.tb193new,self.stb1))
# case for TD-10691
tdSql.error("create table ttb1(ts timestamp, file int )")
......
......@@ -1765,7 +1765,7 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) {
int code = taos_stmt_prepare(stmt, sql, 0);
if (code != 0){
printf("failed to execute taos_stmt_prepare. error:%s\n", taos_stmt_errstr(stmt));
return -1;
exit(1);
}
int id = 0;
......@@ -1801,9 +1801,44 @@ int stmt_funcb_autoctb_e1(TAOS_STMT *stmt) {
return 0;
}
int stmt_multi_insert_check(TAOS_STMT *stmt) {
char *sql;
// The number of tag column list is not equal to the number of tag value list
sql = "insert into ? using stb1 (id1) tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
// The number of column list is not equal to the number of value list
sql = "insert into ? using stb1 tags(1,?,2,?,4,?,6.0,?,'b') "
"(ts, b, v1, v2, v4, v8, f4, f8, bin) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
sql = "insert into ? using stb1 () tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
sql = "insert into ? using stb1 ( tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
sql = "insert into ? using stb1 ) tags(1,?) values(?,?,?,?,?,?,?,?,?,?)";
if (0 == taos_stmt_prepare(stmt, sql, 0)) {
printf("failed to check taos_stmt_prepare. sql:%s\n", sql);
exit(1);
}
return 0;
}
//1 tables 10 records
int stmt_funcb_autoctb_e2(TAOS_STMT *stmt) {
......@@ -4509,7 +4544,6 @@ void* runcase(void *par) {
(void)idx;
#if 1
prepare(taos, 1, 1);
......@@ -4823,6 +4857,16 @@ void* runcase(void *par) {
#endif
#if 1
prepare(taos, 1, 0);
stmt = taos_stmt_init(taos);
printf("stmt_multi_insert_check start\n");
stmt_multi_insert_check(stmt);
printf("stmt_multi_insert_check end\n");
taos_stmt_close(stmt);
#endif
#if 1
prepare(taos, 1, 1);
......@@ -5011,7 +5055,6 @@ void* runcase(void *par) {
printf("check result end\n");
#endif
#if 1
preparem(taos, 0, idx);
......
......@@ -10,7 +10,7 @@ sql connect
print ======================== dnode1 start
sql create function add_one as '/tmp/add_one.so' outputtype int;
sql create aggregate function sum_double as '/tmp/sum_double.so' outputtype int;
sql create aggregate function sum_double as '/tmp/sum_double.so' outputtype bigint;
sql show functions;
if $rows != 2 then
return -1
......
......@@ -11,7 +11,7 @@ print ======================== dnode1 start
sql create function add_one as '/tmp/add_one.so' outputtype int;
sql create function add_one_64232 as '/tmp/add_one_64232.so' outputtype int;
sql create aggregate function sum_double as '/tmp/sum_double.so' outputtype int;
sql create aggregate function sum_double as '/tmp/sum_double.so' outputtype bigint;
sql show functions;
if $rows != 3 then
return -1
......
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
int64_t length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
......@@ -14,31 +15,36 @@ typedef struct SUdfInit{
#define TSDB_DATA_INT_NULL 0x80000000L
#define TSDB_DATA_BIGINT_NULL 0x8000000000000000L
void abs_max(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
void abs_max(char* data, short itype, short ibytes, int numOfRows, int64_t* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
long r = 0;
printf("abs_max input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
int64_t r = 0;
// printf("abs_max input data:%p, type:%d, rows:%d, ts:%p, %" PRId64 ", dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 5) {
r=*(long *)dataOutput;
r=*(int64_t *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
if (*((long *)data + i) == TSDB_DATA_BIGINT_NULL) {
if (*((int64_t *)data + i) == TSDB_DATA_BIGINT_NULL) {
continue;
}
*numOfOutput=1;
long v = labs(*((long *)data + i));
//int64_t v = abs(*((int64_t *)data + i));
int64_t v = *((int64_t *)data + i);
if (v < 0) {
v = 0 - v;
}
if (v > r) {
r = v;
}
}
*(long *)dataOutput=r;
*(int64_t *)dataOutput=r;
printf("abs_max out, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
} else {
// printf("abs_max out, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}else {
*numOfOutput=0;
}
}
......@@ -47,44 +53,43 @@ void abs_max(char* data, short itype, short ibytes, int numOfRows, long long* ts
void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
int i;
int r = 0;
printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
printf("abs_max finalize, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
//int64_t r = 0;
// printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
// *numOfOutput=1;
// printf("abs_max finalize, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
int r = 0;
int64_t r = 0;
if (numOfRows > 0) {
r = *((long *)data);
r = *((int64_t *)data);
}
printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
// printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 1; i < numOfRows; ++i) {
printf("abs_max_merge %d - %ld\n", i, *((long *)data + i));
if (*((long*)data + i) > r) {
r= *((long*)data + i);
// printf("abs_max_merge %d - %" PRId64"\n", i, *((int64_t *)data + i));
if (*((int64_t*)data + i) > r) {
r= *((int64_t*)data + i);
}
}
*(long*)dataOutput=r;
*(int64_t*)dataOutput=r;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
printf("abs_max_merge, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput);
// printf("abs_max_merge, dataoutput:%" PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
int abs_max_init(SUdfInit* buf) {
printf("abs_max init\n");
// printf("abs_max init\n");
return 0;
}
void abs_max_destroy(SUdfInit* buf) {
printf("abs_max destroy\n");
}
// printf("abs_max destroy\n");
}
\ No newline at end of file
......@@ -14,20 +14,18 @@ void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
// printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
for(i=0;i<numOfRows;++i) {
printf("input %d - %d", i, *((int *)data + i));
// printf("input %d - %d", i, *((int *)data + i));
*((int *)dataOutput+i)=*((int *)data + i) + 1;
printf(", output %d\n", *((int *)dataOutput+i));
// printf(", output %d\n", *((int *)dataOutput+i));
if (tsOutput) {
*(long long*)tsOutput=1000000;
}
}
*numOfOutput=numOfRows;
printf("add_one out, numOfOutput:%d\n", *numOfOutput);
// printf("add_one out, numOfOutput:%d\n", *numOfOutput);
}
}
}
\ No newline at end of file
......@@ -28,6 +28,4 @@ void add_one_64232(char* data, short itype, short ibytes, int numOfRows, long lo
printf("add_one_64232 out, numOfOutput:%d\n", *numOfOutput);
}
}
}
\ No newline at end of file
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <inttypes.h>
typedef struct SUdfInit{
int maybe_null; /* 1 if function can return NULL */
int decimals; /* for real functions */
long long length; /* For string functions */
int64_t length; /* For string functions */
char *ptr; /* free pointer for function data */
int const_item; /* 0 if result is independent of arguments */
} SUdfInit;
......@@ -13,13 +14,13 @@ typedef struct SUdfInit{
#define TSDB_DATA_INT_NULL 0x80000000L
void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput,
void sum_double(char* data, short itype, short ibytes, int numOfRows, int64_t* ts, char* dataOutput, char* interBuf, char* tsOutput,
int* numOfOutput, short otype, short obytes, SUdfInit* buf) {
int i;
int r = 0;
printf("sum_double input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
int64_t r = 0;
printf("sum_double input data:%p, type:%d, rows:%d, ts:%p,%"PRId64", dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf);
if (itype == 4) {
r=*(int *)dataOutput;
r=*(int64_t *)dataOutput;
*numOfOutput=0;
for(i=0;i<numOfRows;++i) {
......@@ -29,10 +30,10 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long*
*numOfOutput=1;
r+=*((int *)data + i);
*(int *)dataOutput=r;
}
*(int64_t *)dataOutput=r;
}
printf("sum_double out, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
// printf("sum_double out, dataoutput:%"PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
}
......@@ -40,45 +41,44 @@ void sum_double(char* data, short itype, short ibytes, int numOfRows, long long*
void sum_double_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) {
int i;
int r = 0;
printf("sum_double_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf);
*numOfOutput=1;
*(int*)(buf->ptr)=*(int*)dataOutput*2;
*(int*)dataOutput=*(int*)(buf->ptr);
printf("sum_double finalize, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
int64_t r = 0;
// printf("sum_double_finalize dataoutput:%p:%"PRId64", numOfOutput:%d, buf:%p\n", dataOutput, *(int64_t*)dataOutput, *numOfOutput, buf);
// *numOfOutput=1;
*(int64_t*)(buf->ptr)=*(int64_t*)dataOutput*2;
*(int64_t*)dataOutput=*(int64_t*)(buf->ptr);
// printf("sum_double finalize, dataoutput:%"PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
void sum_double_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) {
void sum_double_merge(char* data, int32_t numOfRows, char* dataOutput, int* numOfOutput, SUdfInit* buf) {
int r = 0;
int sum = 0;
int64_t sum = 0;
printf("sum_double_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
// printf("sum_double_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf);
for (int i = 0; i < numOfRows; ++i) {
printf("sum_double_merge %d - %d\n", i, *((int*)data + i));
sum +=*((int*)data + i);
// printf("sum_double_merge %d - %"PRId64"\n", i, *((int64_t*)data + i));
sum +=*((int64_t*)data + i);
}
*(int*)dataOutput+=sum;
*(int64_t*)dataOutput+=sum;
if (numOfRows > 0) {
*numOfOutput=1;
} else {
*numOfOutput=0;
}
printf("sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput);
// printf("sum_double_merge, dataoutput:%"PRId64", numOfOutput:%d\n", *(int64_t *)dataOutput, *numOfOutput);
}
int sum_double_init(SUdfInit* buf) {
buf->maybe_null=1;
buf->ptr = malloc(sizeof(int));
printf("sum_double init\n");
buf->ptr = malloc(sizeof(int64_t));
// printf("sum_double init\n");
return 0;
}
void sum_double_destroy(SUdfInit* buf) {
free(buf->ptr);
printf("sum_double destroy\n");
}
// printf("sum_double destroy\n");
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册