提交 4db4f8a2 编写于 作者: L lichuang

Merge branch 'develop' into feature/TD-4556

......@@ -131,10 +131,10 @@ cmake .. -DCPUTYPE=mips64 && cmake --build .
### On Windows platform
If you use the Visual Studio 2013, please open a command window by executing "cmd.exe".
Please specify "x86_amd64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat.
Please specify "amd64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat.
```cmd
mkdir debug && cd debug
"C:\Program Files (x86)\Microsoft Visual Studio 12.0\VC\vcvarsall.bat" < x86_amd64 | x86 >
"C:\Program Files (x86)\Microsoft Visual Studio 12.0\VC\vcvarsall.bat" < amd64 | x86 >
cmake .. -G "NMake Makefiles"
nmake
```
......
......@@ -32,7 +32,7 @@ ELSEIF (TD_WINDOWS)
#INSTALL(TARGETS taos RUNTIME DESTINATION driver)
#INSTALL(TARGETS shell RUNTIME DESTINATION .)
IF (TD_MVN_INSTALLED)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.29.jar DESTINATION connector/jdbc)
INSTALL(FILES ${LIBRARY_OUTPUT_PATH}/taos-jdbcdriver-2.0.30.jar DESTINATION connector/jdbc)
ENDIF ()
ELSEIF (TD_DARWIN)
SET(TD_MAKE_INSTALL_SH "${TD_COMMUNITY_DIR}/packaging/tools/make_install.sh")
......
......@@ -123,6 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsDiffQuery(SQueryInfo* pQueryInfo);
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
......@@ -132,7 +133,6 @@ bool hasTagValOutput(SQueryInfo* pQueryInfo);
bool timeWindowInterpoRequired(SQueryInfo *pQueryInfo);
bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
......@@ -214,7 +214,7 @@ void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);
......@@ -329,9 +329,7 @@ STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage, uint64_t qId);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -320,7 +320,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......
......@@ -144,7 +144,7 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
}
// local merge has handle this situation during super table non-projection query.
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE) {
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE) {
pRes->numOfClauseTotal += pRes->numOfRows;
}
......@@ -174,7 +174,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
}
pSql->fp = fp;
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
......@@ -257,14 +257,14 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
}
return;
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE) {
} else if (pCmd->command == TSDB_SQL_RETRIEVE || pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE) {
// in case of show command, return no data
(*pSql->fetchFp)(param, pSql, 0);
} else {
assert(0);
}
} else { // current query is not completed, continue retrieve from node
if (pCmd->command != TSDB_SQL_RETRIEVE_LOCALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
if (pCmd->command != TSDB_SQL_RETRIEVE_GLOBALMERGE && pCmd->command < TSDB_SQL_LOCAL) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
......
......@@ -323,7 +323,7 @@ TAOS_ROW tscFetchRow(void *param) {
// current data set are exhausted, fetch more data from node
if (pRes->row >= pRes->numOfRows && (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
......
此差异已折叠。
......@@ -63,6 +63,9 @@ static SExprInfo* doAddProjectCol(SQueryInfo* pQueryInfo, int32_t colIndex, int3
static int32_t setShowInfo(SSqlObj* pSql, SSqlInfo* pInfo);
static char* getAccountId(SSqlObj* pSql);
static bool serializeExprListToVariant(SArray* pList, tVariant **dest, int16_t colType);
static int32_t validateParamOfRelationIn(tVariant *pVar, int32_t colType);
static bool has(SArray* pFieldList, int32_t startIdx, const char* name);
static char* cloneCurrentDBName(SSqlObj* pSql);
static int32_t getDelimiterIndex(SStrToken* pTableName);
......@@ -134,14 +137,97 @@ static bool validateDebugFlag(int32_t v);
static int32_t checkQueryRangeForFill(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t loadAllTableMeta(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) {
static bool isTimeWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0;
}
int16_t getNewResColId(SSqlCmd* pCmd) {
return pCmd->resColumnId--;
}
// serialize expr in exprlist to binary
// formate "type | size | value"
bool serializeExprListToVariant(SArray* pList, tVariant **dst, int16_t colType) {
bool ret = false;
if (!pList || pList->size <= 0) {
return ret;
}
if (colType == TSDB_DATA_TYPE_DOUBLE || colType == TSDB_DATA_TYPE_FLOAT) {
return ret;
}
tSqlExprItem* item = (tSqlExprItem *)taosArrayGet(pList, 0);
int32_t firstTokenType = item->pNode->token.type;
int32_t type = firstTokenType;
//nchar to binary and
toTSDBType(type);
if (type != colType && (type != TSDB_DATA_TYPE_BINARY || colType != TSDB_DATA_TYPE_NCHAR)) {
return false;
}
type = colType;
SBufferWriter bw = tbufInitWriter( NULL, false );
tbufEnsureCapacity(&bw, 512);
int32_t size = (int32_t)(pList->size);
tbufWriteUint32(&bw, type);
tbufWriteInt32(&bw, size);
for (int32_t i = 0; i < size; i++) {
tSqlExpr* pSub = ((tSqlExprItem*)(taosArrayGet(pList, i)))->pNode;
// check all the token type in expr list same or not
if (firstTokenType != pSub->token.type) {
break;
}
toTSDBType(pSub->token.type);
tVariant var;
tVariantCreate(&var, &pSub->token);
if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT
|| type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_INT) {
tbufWriteInt64(&bw, var.i64);
} else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
tbufWriteDouble(&bw, var.dKey);
} else if (type == TSDB_DATA_TYPE_BINARY){
tbufWriteBinary(&bw, var.pz, var.nLen);
} else if (type == TSDB_DATA_TYPE_NCHAR) {
char *buf = (char *)calloc(1, (var.nLen + 1)*TSDB_NCHAR_SIZE);
if (tVariantDump(&var, buf, type, false) != TSDB_CODE_SUCCESS) {
free(buf);
tVariantDestroy(&var);
break;
}
tbufWriteBinary(&bw, buf, twcslen((wchar_t *)buf) * TSDB_NCHAR_SIZE);
free(buf);
}
tVariantDestroy(&var);
if (i == size - 1) { ret = true;}
}
if (ret == true) {
if ((*dst = calloc(1, sizeof(tVariant))) != NULL) {
tVariantCreateFromBinary(*dst, tbufGetData(&bw, false), tbufTell(&bw), TSDB_DATA_TYPE_BINARY);
} else {
ret = false;
}
}
tbufCloseWriter(&bw);
return ret;
}
static int32_t validateParamOfRelationIn(tVariant *pVar, int32_t colType) {
if (pVar->nType != TSDB_DATA_TYPE_BINARY) {
return -1;
}
SBufferReader br = tbufInitReader(pVar->pz, pVar->nLen, false);
return colType == TSDB_DATA_TYPE_NCHAR ? 0 : (tbufReadUint32(&br) == colType ? 0: -1);
}
static uint8_t convertOptr(SStrToken *pToken) {
switch (pToken->type) {
case TK_LT:
......@@ -162,6 +248,7 @@ static uint8_t convertOptr(SStrToken *pToken) {
return TSDB_RELATION_EQUAL;
case TK_PLUS:
return TSDB_BINARY_OP_ADD;
case TK_MINUS:
return TSDB_BINARY_OP_SUBTRACT;
case TK_STAR:
......@@ -177,6 +264,8 @@ static uint8_t convertOptr(SStrToken *pToken) {
return TSDB_RELATION_ISNULL;
case TK_NOTNULL:
return TSDB_RELATION_NOTNULL;
case TK_IN:
return TSDB_RELATION_IN;
default: { return 0; }
}
}
......@@ -2157,11 +2246,14 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
case TSDB_FUNC_MIN:
case TSDB_FUNC_MAX:
case TSDB_FUNC_DIFF:
case TSDB_FUNC_DERIVATIVE:
case TSDB_FUNC_STDDEV:
case TSDB_FUNC_LEASTSQR: {
// 1. valid the number of parameters
if (pItem->pNode->pParam == NULL || (functionId != TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 1) ||
(functionId == TSDB_FUNC_LEASTSQR && taosArrayGetSize(pItem->pNode->pParam) != 3)) {
int32_t numOfParams = (pItem->pNode->pParam == NULL)? 0: (int32_t) taosArrayGetSize(pItem->pNode->pParam);
if (pItem->pNode->pParam == NULL ||
(functionId != TSDB_FUNC_LEASTSQR && functionId != TSDB_FUNC_DERIVATIVE && numOfParams != 1) ||
((functionId == TSDB_FUNC_LEASTSQR || functionId == TSDB_FUNC_DERIVATIVE) && numOfParams != 3)) {
/* no parameters or more than one parameter for function */
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
......@@ -2182,11 +2274,13 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
// 2. check if sql function can be applied on this column data type
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
SSchema* pSchema = tscGetTableColumnSchema(pTableMetaInfo->pTableMeta, index.columnIndex);
if (!IS_NUMERIC_TYPE(pSchema->type)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && functionId == TSDB_FUNC_DIFF) {
} else if (IS_UNSIGNED_NUMERIC_TYPE(pSchema->type) && (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
}
......@@ -2200,11 +2294,11 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
}
// set the first column ts for diff query
if (functionId == TSDB_FUNC_DIFF) {
if (functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
colIndex += 1;
SColumnIndex indexTS = {.tableIndex = index.tableIndex, .columnIndex = 0};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP, TSDB_KEYSIZE,
getNewResColId(pCmd), TSDB_KEYSIZE, false);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &indexTS, TSDB_DATA_TYPE_TIMESTAMP,
TSDB_KEYSIZE, getNewResColId(pCmd), TSDB_KEYSIZE, false);
SColumnList ids = createColumnList(1, 0, 0);
insertResultField(pQueryInfo, 0, &ids, TSDB_KEYSIZE, TSDB_DATA_TYPE_TIMESTAMP, aAggs[TSDB_FUNC_TS_DUMMY].name, pExpr);
......@@ -2230,12 +2324,29 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, sizeof(double));
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_DOUBLE, DOUBLE_BYTES);
} else if (functionId == TSDB_FUNC_IRATE) {
STableComInfo info = tscGetTableInfo(pTableMetaInfo->pTableMeta);
int64_t prec = info.precision;
tscExprAddParams(&pExpr->base, (char*)&prec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
} else if (functionId == TSDB_FUNC_DERIVATIVE) {
char val[8] = {0};
int64_t tickPerSec = 0;
if (tVariantDump(&pParamElem[1].pNode->value, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (info.precision == TSDB_TIME_PRECISION_MILLI) {
tickPerSec /= 1000;
}
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
memset(val, 0, tListLen(val));
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
}
SColumnList ids = createColumnList(1, index.tableIndex, index.columnIndex);
......@@ -2935,8 +3046,8 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
}
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "TWA not allowed to apply to super table directly";
const char* msg2 = "TWA only support group by tbname for super table query";
const char* msg1 = "TWA/Diff not allowed to apply to super table directly";
const char* msg2 = "TWA/Diff only support group by tbname for super table query";
const char* msg3 = "function not support for super table query";
// filter sql function not supported by metric query yet.
......@@ -2949,7 +3060,7 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
}
}
if (tscIsTWAQuery(pQueryInfo)) {
if (tscIsTWAQuery(pQueryInfo) || tscIsDiffQuery(pQueryInfo)) {
if (pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return true;
......@@ -3207,6 +3318,25 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
retVal = tVariantDump(&pRight->value, (char*)&pColumnFilter->upperBndd, colType, false);
// TK_GT,TK_GE,TK_EQ,TK_NE are based on the pColumn->lowerBndd
} else if (pExpr->tokenId == TK_IN) {
tVariant *pVal;
if (pRight->tokenId != TK_SET || !serializeExprListToVariant(pRight->pParam, &pVal, colType) || colType == TSDB_DATA_TYPE_TIMESTAMP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
if (validateParamOfRelationIn(pVal, colType) != TSDB_CODE_SUCCESS) {
tVariantDestroy(pVal);
free(pVal);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen + 1);
pColumnFilter->len = pVal->nLen;
pColumnFilter->filterstr = 1;
memcpy((char *)(pColumnFilter->pz), (char *)(pVal->pz), pVal->nLen);
//retVal = tVariantDump(pVal, (char *)(pColumnFilter->pz), TSDB_DATA_TYPE_BINARY, false);
tVariantDestroy(pVal);
free(pVal);
} else if (colType == TSDB_DATA_TYPE_BINARY) {
pColumnFilter->pz = (int64_t)calloc(1, bufLen * TSDB_NCHAR_SIZE);
pColumnFilter->len = pRight->value.nLen;
......@@ -3255,6 +3385,9 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
case TK_NOTNULL:
pColumnFilter->lowerRelOptr = TSDB_RELATION_NOTNULL;
break;
case TK_IN:
pColumnFilter->lowerRelOptr = TSDB_RELATION_IN;
break;
default:
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
......@@ -3370,7 +3503,7 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
&& pExpr->tokenId != TK_ISNULL
&& pExpr->tokenId != TK_NOTNULL
&& pExpr->tokenId != TK_LIKE
) {
&& pExpr->tokenId != TK_IN) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
} else {
......@@ -3380,7 +3513,7 @@ static int32_t extractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SC
if (pSchema->type == TSDB_DATA_TYPE_BOOL) {
int32_t t = pExpr->tokenId;
if (t != TK_EQ && t != TK_NE && t != TK_NOTNULL && t != TK_ISNULL) {
if (t != TK_EQ && t != TK_NE && t != TK_NOTNULL && t != TK_ISNULL && t != TK_IN) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
......@@ -4422,7 +4555,11 @@ static int32_t validateTagCondExpr(SSqlCmd* pCmd, tExprNode *p) {
free(tmp);
} else {
double tmp;
retVal = tVariantDump(vVariant, (char*)&tmp, schemaType, false);
if (p->_node.optr == TSDB_RELATION_IN) {
retVal = validateParamOfRelationIn(vVariant, schemaType);
} else {
retVal = tVariantDump(vVariant, (char*)&tmp, schemaType, false);
}
}
if (retVal != TSDB_CODE_SUCCESS) {
......@@ -6376,7 +6513,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
functId != TSDB_FUNC_DIFF && functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
......@@ -6858,7 +6995,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
const char* msg6 = "from missing in subclause";
const char* msg7 = "time interval is required";
const char* msg8 = "the first column should be primary timestamp column";
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo->numOfTables == 1);
......@@ -7812,14 +7949,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
{ // set the query info
pQueryInfo->projectionQuery = tscIsProjectionQuery(pQueryInfo);
pQueryInfo->hasFilter = tscHasColumnFilter(pQueryInfo);
pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo);
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryInfo->projectionQuery = tscIsProjectionQuery(pQueryInfo);
pQueryInfo->hasFilter = tscHasColumnFilter(pQueryInfo);
pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo);
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
pQueryInfo->diffQuery = tscIsDiffQuery(pQueryInfo);
SExprInfo** p = NULL;
int32_t numOfExpr = 0;
......@@ -7930,6 +8068,24 @@ int32_t exprTreeFromSqlExpr(SSqlCmd* pCmd, tExprNode **pExpr, const tSqlExpr* pS
}
return TSDB_CODE_SUCCESS;
} else if (pSqlExpr->tokenId == TK_SET) {
int32_t type = -1;
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
if (pCols != NULL) {
SColIndex* idx = taosArrayGet(pCols, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, idx->colIndex);
if (pSchema != NULL) {
type = pSchema->type;
}
}
tVariant *pVal;
if (serializeExprListToVariant(pSqlExpr->pParam, &pVal, type) == false) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support filter expression");
}
*pExpr = calloc(1, sizeof(tExprNode));
(*pExpr)->nodeType = TSQL_NODE_VALUE;
(*pExpr)->pVal = pVal;
} else {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support filter expression");
}
......
......@@ -1588,7 +1588,7 @@ int tscProcessLocalRetrieveRsp(SSqlObj *pSql) {
return tscLocalResultCommonBuilder(pSql, numOfRes);
}
int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd* pCmd = &pSql->cmd;
......@@ -1615,12 +1615,13 @@ int tscProcessRetrieveLocalMergeRsp(SSqlObj *pSql) {
taosArrayPush(group, &tableKeyInfo);
taosArrayPush(tableGroupInfo.pGroupList, &group);
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE);
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute query processing", pSql->self, pSql->self);
pQueryInfo->pQInfo = createQInfoFromQueryNode(pQueryInfo, &tableGroupInfo, NULL, NULL, pRes->pMerger, MERGE_STAGE, pSql->self);
}
uint64_t localQueryId = 0;
uint64_t localQueryId = pSql->self;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo);
convertQueryResult(pRes, pQueryInfo, pSql->self);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {
......@@ -2689,7 +2690,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_EMPTY_RESULT] = tscProcessEmptyResultRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_LOCALMERGE] = tscProcessRetrieveLocalMergeRsp;
tscProcessMsgRsp[TSDB_SQL_RETRIEVE_GLOBALMERGE] = tscProcessRetrieveGlobalMergeRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
......
......@@ -456,7 +456,7 @@ static bool needToFetchNewBlock(SSqlObj* pSql) {
return (pRes->completed != true || hasMoreVnodesToTry(pSql) || hasMoreClauseToTry(pSql)) &&
(pCmd->command == TSDB_SQL_RETRIEVE ||
pCmd->command == TSDB_SQL_RETRIEVE_LOCALMERGE ||
pCmd->command == TSDB_SQL_RETRIEVE_GLOBALMERGE ||
pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE ||
pCmd->command == TSDB_SQL_FETCH ||
pCmd->command == TSDB_SQL_SHOW ||
......
......@@ -1977,9 +1977,8 @@ void tscHandleMasterJoinQuery(SSqlObj* pSql) {
}
memset(pSql->subState.states, 0, sizeof(*pSql->subState.states) * pSql->subState.numOfSub);
tscDebug("0x%"PRIx64" reset all sub states to 0", pSql->self);
tscDebug("0x%"PRIx64" reset all sub states to 0, start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
tscDebug("0x%"PRIx64" start subquery, total:%d", pSql->self, pQueryInfo->numOfTables);
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SJoinSupporter *pSupporter = tscCreateJoinSupporter(pSql, i);
if (pSupporter == NULL) { // failed to create support struct, abort current query
......@@ -2424,7 +2423,7 @@ int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
// pRes->code check only serves in launching metric sub-queries
if (pRes->code == TSDB_CODE_TSC_QUERY_CANCELLED) {
pCmd->command = TSDB_SQL_RETRIEVE_LOCALMERGE; // enable the abort of kill super table function.
pCmd->command = TSDB_SQL_RETRIEVE_GLOBALMERGE; // enable the abort of kill super table function.
return pRes->code;
}
......@@ -2780,7 +2779,7 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
if (code == TSDB_CODE_SUCCESS && trsupport->pExtMemBuffer == NULL) {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_EMPTY_RESULT; // no result, set the result empty
} else {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_LOCALMERGE;
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE;
}
tscCreateResPointerInfo(&pParentSql->res, pPQueryInfo);
......@@ -3502,7 +3501,7 @@ static UNUSED_FUNC bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql) {
}
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pSourceOperator,
char* sql, void* merger, int32_t stage) {
char* sql, void* merger, int32_t stage, uint64_t qId) {
assert(pQueryInfo != NULL);
SQInfo *pQInfo = (SQInfo *)calloc(1, sizeof(SQInfo));
if (pQInfo == NULL) {
......@@ -3511,7 +3510,7 @@ void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, STableGroupInfo* pTableGr
// to make sure third party won't overwrite this structure
pQInfo->signature = pQInfo;
pQInfo->qId = qId;
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = &pQInfo->query;
......
......@@ -222,6 +222,8 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
functionId != TSDB_FUNC_TS &&
functionId != TSDB_FUNC_ARITHM &&
functionId != TSDB_FUNC_TS_COMP &&
functionId != TSDB_FUNC_DIFF &&
functionId != TSDB_FUNC_TS_DUMMY &&
functionId != TSDB_FUNC_TID_TAG) {
return false;
}
......@@ -434,6 +436,23 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
return false;
}
bool tscIsDiffQuery(SQueryInfo* pQueryInfo) {
size_t num = tscNumOfExprs(pQueryInfo);
for(int32_t i = 0; i < num; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr == NULL || pExpr->base.functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (pExpr->base.functionId == TSDB_FUNC_DIFF) {
return true;
}
}
return false;
}
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
return pQueryInfo->sessionWindow.gap > 0;
}
......@@ -467,42 +486,12 @@ bool tscNeedReverseScan(SQueryInfo* pQueryInfo) {
return false;
}
bool isSimpleAggregate(SQueryInfo* pQueryInfo) {
if (pQueryInfo->interval.interval > 0) {
return false;
}
// Note:top/bottom query is fixed output query
if (tscIsTopBotQuery(pQueryInfo) || tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) {
return true;
}
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
if (pExpr == NULL) {
continue;
}
int32_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (!IS_MULTIOUTPUT(aAggs[functionId].status)) {
return true;
}
}
return false;
}
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
if (pQueryInfo->interval.interval > 0 || pQueryInfo->sessionWindow.gap > 0) {
return false;
}
if (tscGroupbyColumn(pQueryInfo) || isTsCompQuery(pQueryInfo)) {
if (tscIsDiffQuery(pQueryInfo)) {
return false;
}
......@@ -518,13 +507,13 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
continue;
}
if (!IS_MULTIOUTPUT(aAggs[functionId].status)) {
if ((!IS_MULTIOUTPUT(aAggs[functionId].status)) ||
(functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_TS_COMP)) {
return true;
}
}
return false;
}
bool isBlockDistQuery(SQueryInfo* pQueryInfo) {
......@@ -812,6 +801,7 @@ static void fetchNextBlockIfCompleted(SOperatorInfo* pOperator, bool* newgroup)
for (int32_t i = 0; i < pOperator->numOfUpstream; ++i) {
SJoinStatus* pStatus = &pJoinInfo->status[i];
if (pStatus->pBlock == NULL || pStatus->index >= pStatus->pBlock->info.rows) {
tscDebug("Retrieve nest query result, index:%d, total:%d", i, pOperator->numOfUpstream);
pStatus->pBlock = pOperator->upstream[i]->exec(pOperator->upstream[i], newgroup);
pStatus->index = 0;
......@@ -1056,7 +1046,7 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
return pOperator;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId) {
// set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
......@@ -1066,6 +1056,7 @@ void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
tscSetResRawPtrRv(pRes, pQueryInfo, p);
}
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
pRes->row = 0;
pRes->completed = (pRes->numOfRows == 0);
}
......@@ -1088,7 +1079,9 @@ static void createInputDataFilterInfo(SQueryInfo* px, int32_t numOfCol1, int32_t
tfree(tableCols);
}
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput) {
void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pSql) {
SSqlRes* pOutput = &pSql->res;
// handle the following query process
if (px->pQInfo == NULL) {
SColumnInfo* pColumnInfo = extractColumnInfoFromResult(px->colList);
......@@ -1168,7 +1161,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
}
}
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN);
tscDebug("0x%"PRIx64" create QInfo 0x%"PRIx64" to execute the main query while all nest queries are ready", pSql->self, pSql->self);
px->pQInfo = createQInfoFromQueryNode(px, &tableGroupInfo, pSourceOperator, NULL, NULL, MASTER_SCAN, pSql->self);
tfree(pColumnInfo);
tfree(schema);
......@@ -1176,9 +1171,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
pSourceOperator->pRuntimeEnv = &px->pQInfo->runtimeEnv;
}
uint64_t qId = 0;
uint64_t qId = pSql->self;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px);
convertQueryResult(pOutput, px, pSql->self);
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
......@@ -1374,7 +1369,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_LOCALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
cmd == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscRemoveFromSqlList(pSql);
}
......@@ -2177,6 +2172,7 @@ size_t tscNumOfExprs(SQueryInfo* pQueryInfo) {
return taosArrayGetSize(pQueryInfo->exprList);
}
// todo REFACTOR
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes) {
assert (pExpr != NULL || argument != NULL || bytes != 0);
......@@ -3278,7 +3274,6 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
if (pNewQueryInfo->buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -3438,7 +3433,7 @@ void doRetrieveSubqueryData(SSchedMsg *pMsg) {
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, &pSql->res);
handleDownstreamOperator(pSql->pSubs, pSql->subState.numOfSub, pQueryInfo, pSql);
pSql->res.qId = -1;
if (pSql->res.code == TSDB_CODE_SUCCESS) {
......@@ -3468,13 +3463,12 @@ static void tscSubqueryRetrieveCallback(void* param, TAOS_RES* tres, int code) {
}
pParentSql->cmd.active = pParentSql->cmd.pQueryInfo;
SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pParentSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
pParentSql->res.qId = -1;
if (pSql->res.code == TSDB_CODE_SUCCESS) {
(*pSql->fp)(pParentSql->param, pParentSql, pParentSql->res.numOfRows);
} else {
tscAsyncResultOnError(pParentSql);
}
}
// todo handle the failure
......@@ -4238,7 +4232,8 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo);
pQueryAttr->stabledev = isStabledev(pQueryInfo);
pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo);
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
pQueryAttr->diffQuery = tscIsDiffQuery(pQueryInfo);
pQueryAttr->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
......@@ -4257,7 +4252,6 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window;
} else {
......
......@@ -76,7 +76,7 @@ enum {
// SQL below for client local
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DESCRIBE_TABLE, "describe-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_LOCALMERGE, "retrieve-localmerge" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_GLOBALMERGE, "retrieve-globalmerge" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_TABLE_JOIN_RETRIEVE, "join-retrieve" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_SHOW_CREATE_TABLE, "show-create-table")
......
......@@ -94,6 +94,8 @@ bool exprTreeApplyFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
void buildFilterSetFromBinary(void **q, const char *buf, int32_t len);
#ifdef __cplusplus
}
#endif
......
......@@ -466,6 +466,32 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
return expr;
}
void buildFilterSetFromBinary(void **q, const char *buf, int32_t len) {
SBufferReader br = tbufInitReader(buf, len, false);
uint32_t type = tbufReadUint32(&br);
SHashObj *pObj = taosHashInit(256, taosGetDefaultHashFunction(type), true, false);
int dummy = -1;
int32_t sz = tbufReadInt32(&br);
for (int32_t i = 0; i < sz; i++) {
if (type == TSDB_DATA_TYPE_BOOL || type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_SMALLINT || type == TSDB_DATA_TYPE_BIGINT || type == TSDB_DATA_TYPE_INT) {
int64_t val = tbufReadInt64(&br);
taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy));
} else if (type == TSDB_DATA_TYPE_DOUBLE || type == TSDB_DATA_TYPE_FLOAT) {
double val = tbufReadDouble(&br);
taosHashPut(pObj, (char *)&val, sizeof(val), &dummy, sizeof(dummy));
} else if (type == TSDB_DATA_TYPE_BINARY) {
size_t t = 0;
const char *val = tbufReadBinary(&br, &t);
taosHashPut(pObj, (char *)val, t, &dummy, sizeof(dummy));
} else if (type == TSDB_DATA_TYPE_NCHAR) {
size_t t = 0;
const char *val = tbufReadBinary(&br, &t);
taosHashPut(pObj, (char *)val, t, &dummy, sizeof(dummy));
}
}
*q = (void *)pObj;
}
tExprNode* exprdup(tExprNode* pNode) {
if (pNode == NULL) {
return NULL;
......
......@@ -8,10 +8,9 @@ IF (TD_MVN_INSTALLED)
ADD_CUSTOM_COMMAND(OUTPUT ${JDBC_CMD_NAME}
POST_BUILD
COMMAND mvn -Dmaven.test.skip=true install -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.29.jar ${LIBRARY_OUTPUT_PATH}
COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/target/taos-jdbcdriver-2.0.30.jar ${LIBRARY_OUTPUT_PATH}
COMMAND mvn -Dmaven.test.skip=true clean -f ${CMAKE_CURRENT_SOURCE_DIR}/pom.xml
COMMENT "build jdbc driver")
ADD_CUSTOM_TARGET(${JDBC_TARGET_NAME} ALL WORKING_DIRECTORY ${EXECUTABLE_OUTPUT_PATH} DEPENDS ${JDBC_CMD_NAME})
ENDIF ()
......@@ -5,7 +5,7 @@
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.29</version>
<version>2.0.30</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
......
......@@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.29</version>
<version>2.0.30</version>
<packaging>jar</packaging>
<name>JDBCDriver</name>
<url>https://github.com/taosdata/TDengine/tree/master/src/connector/jdbc</url>
......
package com.taosdata.jdbc;
import com.sun.org.apache.xpath.internal.operations.Bool;
import java.sql.ParameterMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
......@@ -49,6 +51,22 @@ public abstract class AbstractParameterMetaData extends WrapperImpl implements P
if (param < 1 && param >= parameters.length)
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PARAMETER_INDEX_OUT_RANGE);
if (parameters[param - 1] instanceof Boolean)
return TSDBConstants.BOOLEAN_PRECISION;
if (parameters[param - 1] instanceof Byte)
return TSDBConstants.TINYINT_PRECISION;
if (parameters[param - 1] instanceof Short)
return TSDBConstants.SMALLINT_PRECISION;
if (parameters[param - 1] instanceof Integer)
return TSDBConstants.INT_PRECISION;
if (parameters[param - 1] instanceof Long)
return TSDBConstants.BIGINT_PRECISION;
if (parameters[param - 1] instanceof Timestamp)
return TSDBConstants.TIMESTAMP_MS_PRECISION;
if (parameters[param - 1] instanceof Float)
return TSDBConstants.FLOAT_PRECISION;
if (parameters[param - 1] instanceof Double)
return TSDBConstants.DOUBLE_PRECISION;
if (parameters[param - 1] instanceof String)
return ((String) parameters[param - 1]).length();
if (parameters[param - 1] instanceof byte[])
......@@ -60,6 +78,11 @@ public abstract class AbstractParameterMetaData extends WrapperImpl implements P
public int getScale(int param) throws SQLException {
if (param < 1 && param >= parameters.length)
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_PARAMETER_INDEX_OUT_RANGE);
if (parameters[param - 1] instanceof Float)
return TSDBConstants.FLOAT_SCALE;
if (parameters[param - 1] instanceof Double)
return TSDBConstants.DOUBLE_SCALE;
return 0;
}
......
......@@ -66,10 +66,16 @@ public abstract class AbstractResultSet extends WrapperImpl implements ResultSet
public abstract byte[] getBytes(int columnIndex) throws SQLException;
@Override
public abstract Date getDate(int columnIndex) throws SQLException;
public Date getDate(int columnIndex) throws SQLException {
Timestamp timestamp = getTimestamp(columnIndex);
return timestamp == null ? null : new Date(timestamp.getTime());
}
@Override
public abstract Time getTime(int columnIndex) throws SQLException;
public Time getTime(int columnIndex) throws SQLException {
Timestamp timestamp = getTimestamp(columnIndex);
return timestamp == null ? null : new Time(timestamp.getTime());
}
@Override
public abstract Timestamp getTimestamp(int columnIndex) throws SQLException;
......
......@@ -41,15 +41,15 @@ public abstract class TSDBConstants {
public static final int TSDB_DATA_TYPE_BINARY = 8;
public static final int TSDB_DATA_TYPE_TIMESTAMP = 9;
public static final int TSDB_DATA_TYPE_NCHAR = 10;
/*
系统增加新的无符号数据类型,分别是:
unsigned tinyint, 数值范围:0-254, NULL 为255
unsigned smallint,数值范围: 0-65534, NULL 为65535
unsigned int,数值范围:0-4294967294,NULL 为4294967295u
unsigned bigint,数值范围:0-18446744073709551614u,NULL 为18446744073709551615u。
example:
create table tb(ts timestamp, a tinyint unsigned, b smallint unsigned, c int unsigned, d bigint unsigned);
*/
/**
* 系统增加新的无符号数据类型,分别是:
* unsigned tinyint, 数值范围:0-254, NULL 为255
* unsigned smallint,数值范围: 0-65534, NULL 为65535
* unsigned int,数值范围:0-4294967294,NULL 为4294967295u
* unsigned bigint,数值范围:0-18446744073709551614u,NULL 为18446744073709551615u。
* example:
* create table tb(ts timestamp, a tinyint unsigned, b smallint unsigned, c int unsigned, d bigint unsigned);
*/
public static final int TSDB_DATA_TYPE_UTINYINT = 11; //unsigned tinyint
public static final int TSDB_DATA_TYPE_USMALLINT = 12; //unsigned smallint
public static final int TSDB_DATA_TYPE_UINT = 13; //unsigned int
......@@ -57,6 +57,47 @@ public abstract class TSDBConstants {
// nchar column max length
public static final int maxFieldSize = 16 * 1024;
// precision for data types
public static final int BOOLEAN_PRECISION = 1;
public static final int TINYINT_PRECISION = 4;
public static final int SMALLINT_PRECISION = 6;
public static final int INT_PRECISION = 11;
public static final int BIGINT_PRECISION = 20;
public static final int FLOAT_PRECISION = 12;
public static final int DOUBLE_PRECISION = 22;
public static final int TIMESTAMP_MS_PRECISION = 23;
public static final int TIMESTAMP_US_PRECISION = 26;
// scale for data types
public static final int FLOAT_SCALE = 31;
public static final int DOUBLE_SCALE = 31;
public static int typeName2JdbcType(String type) {
switch (type.toUpperCase()) {
case "TIMESTAMP":
return Types.TIMESTAMP;
case "INT":
return Types.INTEGER;
case "BIGINT":
return Types.BIGINT;
case "FLOAT":
return Types.FLOAT;
case "DOUBLE":
return Types.DOUBLE;
case "BINARY":
return Types.BINARY;
case "SMALLINT":
return Types.SMALLINT;
case "TINYINT":
return Types.TINYINT;
case "BOOL":
return Types.BOOLEAN;
case "NCHAR":
return Types.NCHAR;
default:
return Types.NULL;
}
}
public static int taosType2JdbcType(int taosType) throws SQLException {
switch (taosType) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL:
......@@ -88,7 +129,7 @@ public abstract class TSDBConstants {
}
public static String taosType2JdbcTypeName(int taosType) throws SQLException {
switch (taosType){
switch (taosType) {
case TSDBConstants.TSDB_DATA_TYPE_BOOL:
return "BOOL";
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
......@@ -119,7 +160,7 @@ public abstract class TSDBConstants {
}
public static int jdbcType2TaosType(int jdbcType) throws SQLException {
switch (jdbcType){
switch (jdbcType) {
case Types.BOOLEAN:
return TSDBConstants.TSDB_DATA_TYPE_BOOL;
case Types.TINYINT:
......@@ -145,7 +186,7 @@ public abstract class TSDBConstants {
}
public static String jdbcType2TaosTypeName(int jdbcType) throws SQLException {
switch (jdbcType){
switch (jdbcType) {
case Types.BOOLEAN:
return "BOOL";
case Types.TINYINT:
......
......@@ -16,13 +16,13 @@
*/
package com.taosdata.jdbc;
import com.taosdata.jdbc.utils.TaosInfo;
import java.nio.ByteBuffer;
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.util.List;
import com.taosdata.jdbc.utils.TaosInfo;
/**
* JNI connector
*/
......@@ -30,10 +30,10 @@ public class TSDBJNIConnector {
private static volatile Boolean isInitialized = false;
private TaosInfo taosInfo = TaosInfo.getInstance();
// Connection pointer used in C
private long taos = TSDBConstants.JNI_NULL_POINTER;
// result set status in current connection
private boolean isResultsetClosed;
......@@ -194,7 +194,9 @@ public class TSDBJNIConnector {
* Get schema metadata
*/
public int getSchemaMetaData(long resultSet, List<ColumnMetaData> columnMetaData) {
return this.getSchemaMetaDataImp(this.taos, resultSet, columnMetaData);
int ret = this.getSchemaMetaDataImp(this.taos, resultSet, columnMetaData);
columnMetaData.stream().forEach(column -> column.setColIndex(column.getColIndex() + 1));
return ret;
}
private native int getSchemaMetaDataImp(long connection, long resultSet, List<ColumnMetaData> columnMetaData);
......@@ -221,7 +223,7 @@ public class TSDBJNIConnector {
*/
public void closeConnection() throws SQLException {
int code = this.closeConnectionImp(this.taos);
if (code < 0) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
} else if (code == 0) {
......@@ -229,7 +231,7 @@ public class TSDBJNIConnector {
} else {
throw new SQLException("Undefined error code returned by TDengine when closing a connection");
}
// invoke closeConnectionImpl only here
taosInfo.connect_close_increment();
}
......@@ -274,67 +276,76 @@ public class TSDBJNIConnector {
}
private native int validateCreateTableSqlImp(long connection, byte[] sqlBytes);
public long prepareStmt(String sql) throws SQLException {
Long stmt = prepareStmtImp(sql.getBytes(), this.taos);
if (stmt == TSDBConstants.JNI_TDENGINE_ERROR) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_SQL);
} else if (stmt == TSDBConstants.JNI_CONNECTION_NULL) {
public long prepareStmt(String sql) throws SQLException {
Long stmt;
try {
stmt = prepareStmtImp(sql.getBytes(), this.taos);
} catch (Exception e) {
e.printStackTrace();
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNSUPPORTED_ENCODING);
}
if (stmt == TSDBConstants.JNI_CONNECTION_NULL) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
} else if (stmt == TSDBConstants.JNI_SQL_NULL) {
}
if (stmt == TSDBConstants.JNI_SQL_NULL) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_SQL_NULL);
} else if (stmt == TSDBConstants.JNI_OUT_OF_MEMORY) {
}
if (stmt == TSDBConstants.JNI_OUT_OF_MEMORY) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_OUT_OF_MEMORY);
}
return stmt;
return stmt;
}
private native long prepareStmtImp(byte[] sql, long con);
public void setBindTableName(long stmt, String tableName) throws SQLException {
int code = setBindTableNameImp(stmt, tableName, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
int code = setBindTableNameImp(stmt, tableName, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to set table name");
}
}
}
}
private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(),
nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
}
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(),
nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
}
}
private native int setTableNameTagsImp(long stmt, String name, int numOfTags, byte[] tags, byte[] typeList, byte[] lengthList, byte[] nullList, long conn);
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows,int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows, int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind column data");
}
}
}
}
private native int bindColDataImp(long stmt, byte[] colDataList, byte[] lengthList, byte[] isNullList, int type, int bytes, int numOfRows, int columnIndex, long conn);
public void executeBatch(long stmt) throws SQLException {
int code = executeBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
int code = executeBatchImp(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to execute batch bind");
}
}
}
private native int executeBatchImp(long stmt, long con);
public void closeBatch(long stmt) throws SQLException {
int code = closeStmt(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
int code = closeStmt(stmt, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to close batch bind");
}
}
}
private native int closeStmt(long stmt, long con);
}
......@@ -113,6 +113,7 @@ public class TSDBResultSetMetaData extends WrapperImpl implements ResultSetMetaD
ColumnMetaData columnMetaData = this.colMetaDataList.get(column - 1);
switch (columnMetaData.getColType()) {
case TSDBConstants.TSDB_DATA_TYPE_FLOAT:
return 5;
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE:
......
......@@ -32,14 +32,15 @@ public class TSDBStatement extends AbstractStatement {
}
public ResultSet executeQuery(String sql) throws SQLException {
// check if closed
if (isClosed()) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_STATEMENT_CLOSED);
}
//TODO: 如果在executeQuery方法中执行insert语句,那么先执行了SQL,再通过pSql来检查是否为一个insert语句,但这个insert SQL已经执行成功了
// execute query
//TODO:
// this is an unreasonable implementation, if the paratemer is a insert statement,
// the JNI connector will execute the sql at first and return a pointer: pSql,
// we use this pSql and invoke the isUpdateQuery(long pSql) method to decide .
// but the insert sql is already executed in database.
//execute query
long pSql = this.connection.getConnector().executeQuery(sql);
// if pSql is create/insert/update/delete/alter SQL
if (this.connection.getConnector().isUpdateQuery(pSql)) {
......
......@@ -95,16 +95,7 @@ public class Utils {
public static String getNativeSql(String rawSql, Object[] parameters) {
// toLowerCase
String preparedSql = rawSql.trim().toLowerCase();
String[] clause = new String[0];
if (SqlSyntaxValidator.isInsertSql(preparedSql)) {
// insert or import
clause = new String[]{"values\\s*\\(.*?\\)", "tags\\s*\\(.*?\\)"};
}
if (SqlSyntaxValidator.isSelectSql(preparedSql)) {
// select
clause = new String[]{"where\\s*.*"};
}
String[] clause = new String[]{"values\\s*\\(.*?\\)", "tags\\s*\\(.*?\\)", "where\\s*.*"};
Map<Integer, Integer> placeholderPositions = new HashMap<>();
RangeSet<Integer> clauseRangeSet = TreeRangeSet.create();
findPlaceholderPosition(preparedSql, placeholderPositions);
......
......@@ -45,9 +45,9 @@ public class TSDBJNIConnectorTest {
rowData = new TSDBResultSetRowData(columnSize);
// iterate resultSet
for (int i = 0; next(connector, pSql); i++) {
System.out.println("col[" + i + "] size: " + rowData.getColSize());
rowData.getData().stream().forEach(col -> System.out.print(col + "\t"));
System.out.println();
// System.out.println("col[" + i + "] size: " + rowData.getColSize());
// rowData.getData().stream().forEach(col -> System.out.print(col + "\t"));
// System.out.println();
}
// close resultSet
code = connector.freeResultSet(pSql);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册