提交 a4e6c27f 编写于 作者: haoranc's avatar haoranc

Merge branch 'develop' of github.com:taosdata/TDengine into dev/chr

...@@ -83,6 +83,8 @@ IF (TD_ARM_64) ...@@ -83,6 +83,8 @@ IF (TD_ARM_64)
ADD_DEFINITIONS(-DUSE_LIBICONV) ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm64 is defined") MESSAGE(STATUS "arm64 is defined")
SET(COMMON_FLAGS "-Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE") SET(COMMON_FLAGS "-Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF () ENDIF ()
IF (TD_ARM_32) IF (TD_ARM_32)
...@@ -91,6 +93,8 @@ IF (TD_ARM_32) ...@@ -91,6 +93,8 @@ IF (TD_ARM_32)
ADD_DEFINITIONS(-DUSE_LIBICONV) ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "arm32 is defined") MESSAGE(STATUS "arm32 is defined")
SET(COMMON_FLAGS "-Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ") SET(COMMON_FLAGS "-Wall -Werror -fPIC -fsigned-char -fpack-struct=8 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE -Wno-pointer-to-int-cast -Wno-int-to-pointer-cast -Wno-incompatible-pointer-types ")
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF () ENDIF ()
IF (TD_MIPS_64) IF (TD_MIPS_64)
...@@ -143,6 +147,7 @@ IF (TD_LINUX) ...@@ -143,6 +147,7 @@ IF (TD_LINUX)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF () ENDIF ()
IF (TD_DARWIN_64) IF (TD_DARWIN_64)
...@@ -164,6 +169,7 @@ IF (TD_DARWIN_64) ...@@ -164,6 +169,7 @@ IF (TD_DARWIN_64)
SET(RELEASE_FLAGS "-Og") SET(RELEASE_FLAGS "-Og")
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF () ENDIF ()
IF (TD_WINDOWS) IF (TD_WINDOWS)
...@@ -194,6 +200,7 @@ IF (TD_WINDOWS) ...@@ -194,6 +200,7 @@ IF (TD_WINDOWS)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/regex) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/regex)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/wepoll/inc) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/wepoll/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MsvcLibX/include) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/MsvcLibX/include)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lua/src)
ENDIF () ENDIF ()
IF (TD_WINDOWS_64) IF (TD_WINDOWS_64)
......
...@@ -37,13 +37,13 @@ ELSEIF (TD_DARWIN) ...@@ -37,13 +37,13 @@ ELSEIF (TD_DARWIN)
# set the static lib name # set the static lib name
ADD_LIBRARY(taos_static STATIC ${SRC}) ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m) TARGET_LINK_LIBRARIES(taos_static common query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static") SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1) SET_TARGET_PROPERTIES(taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1)
# generate dynamic library (*.dylib) # generate dynamic library (*.dylib)
ADD_LIBRARY(taos SHARED ${SRC}) ADD_LIBRARY(taos SHARED ${SRC})
TARGET_LINK_LIBRARIES(taos common query trpc tutil pthread m) TARGET_LINK_LIBRARIES(taos common query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1) SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
#set version of .dylib #set version of .dylib
...@@ -68,19 +68,19 @@ ELSEIF (TD_WINDOWS) ...@@ -68,19 +68,19 @@ ELSEIF (TD_WINDOWS)
IF (NOT TD_GODLL) IF (NOT TD_GODLL)
SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def) SET_TARGET_PROPERTIES(taos PROPERTIES LINK_FLAGS /DEF:${TD_COMMUNITY_DIR}/src/client/src/taos.def)
ENDIF () ENDIF ()
TARGET_LINK_LIBRARIES(taos trpc tutil query) TARGET_LINK_LIBRARIES(taos trpc tutil query lua)
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
SET(CMAKE_MACOSX_RPATH 1) SET(CMAKE_MACOSX_RPATH 1)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux) INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/jni/linux)
ADD_LIBRARY(taos_static STATIC ${SRC}) ADD_LIBRARY(taos_static STATIC ${SRC})
TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m) TARGET_LINK_LIBRARIES(taos_static query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static") SET_TARGET_PROPERTIES(taos_static PROPERTIES OUTPUT_NAME "taos_static")
# generate dynamic library (*.dylib) # generate dynamic library (*.dylib)
ADD_LIBRARY(taos SHARED ${SRC}) ADD_LIBRARY(taos SHARED ${SRC})
TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m) TARGET_LINK_LIBRARIES(taos query trpc tutil pthread m lua)
SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1) SET_TARGET_PROPERTIES(taos PROPERTIES CLEAN_DIRECT_OUTPUT 1)
......
...@@ -107,6 +107,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint ...@@ -107,6 +107,7 @@ SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint
uint32_t offset); uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList); void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyUdfArrayList(SArray* pUdfList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta); void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock); int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
...@@ -261,6 +262,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo); ...@@ -261,6 +262,7 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo); int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo); int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists); int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
int32_t tscGetUdfFromNode(SSqlObj *pSql, SQueryInfo* pQueryInfo);
void tscResetForNextRetrieve(SSqlRes* pRes); void tscResetForNextRetrieve(SSqlRes* pRes);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo); void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
...@@ -307,10 +309,9 @@ bool hasMoreClauseToTry(SSqlObj* pSql); ...@@ -307,10 +309,9 @@ bool hasMoreClauseToTry(SSqlObj* pSql);
void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta); void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp); void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp); void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet); int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp); int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, SArray* pUdfList, __async_cb_func_t fp, bool metaClone);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray); int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
......
...@@ -146,6 +146,7 @@ typedef struct { ...@@ -146,6 +146,7 @@ typedef struct {
SInsertStatementParam insertParam; SInsertStatementParam insertParam;
char reserve1[3]; // fix bus error on arm32 char reserve1[3]; // fix bus error on arm32
int32_t count; // todo remove it int32_t count; // todo remove it
bool subCmd;
char reserve2[3]; // fix bus error on arm32 char reserve2[3]; // fix bus error on arm32
int16_t numOfCols; int16_t numOfCols;
......
...@@ -602,6 +602,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -602,6 +602,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]); aAggs[functionId].mergeFunc(&pCtx[j]);
} }
} else { } else {
...@@ -610,6 +619,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -610,6 +619,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pCtx[j]); aAggs[functionId].xFinalize(&pCtx[j]);
} }
...@@ -626,7 +644,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -626,7 +644,11 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
} }
for(int32_t j = 0; j < numOfExpr; ++j) { for(int32_t j = 0; j < numOfExpr; ++j) {
aAggs[pCtx[j].functionId].init(&pCtx[j]); if (pCtx[j].functionId < 0) {
continue;
}
aAggs[pCtx[j].functionId].init(&pCtx[j], pCtx[j].resultInfo);
} }
for (int32_t j = 0; j < numOfExpr; ++j) { for (int32_t j = 0; j < numOfExpr; ++j) {
...@@ -638,6 +660,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -638,6 +660,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]); aAggs[functionId].mergeFunc(&pCtx[j]);
} }
} }
...@@ -651,6 +682,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD ...@@ -651,6 +682,15 @@ static void doExecuteFinalMerge(SOperatorInfo* pOperator, int32_t numOfExpr, SSD
if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TAG_DUMMY || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_MERGE);
continue;
}
aAggs[functionId].mergeFunc(&pCtx[j]); aAggs[functionId].mergeFunc(&pCtx[j]);
} }
} }
...@@ -871,7 +911,6 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -871,7 +911,6 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
{ {
if (pAggInfo->hasDataBlockForNewGroup) { if (pAggInfo->hasDataBlockForNewGroup) {
pAggInfo->binfo.pRes->info.rows = 0;
pAggInfo->hasPrev = false; // now we start from a new group data set. pAggInfo->hasPrev = false; // now we start from a new group data set.
// not belongs to the same group, return the result of current group; // not belongs to the same group, return the result of current group;
...@@ -880,7 +919,13 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -880,7 +919,13 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
{ // reset output buffer { // reset output buffer
for(int32_t j = 0; j < pOperator->numOfOutput; ++j) { for(int32_t j = 0; j < pOperator->numOfOutput; ++j) {
aAggs[pAggInfo->binfo.pCtx[j].functionId].init(&pAggInfo->binfo.pCtx[j]); SQLFunctionCtx* pCtx = &pAggInfo->binfo.pCtx[j];
if (pCtx->functionId < 0) {
clearOutputBuf(&pAggInfo->binfo, &pAggInfo->bufCapacity);
continue;
}
aAggs[pCtx->functionId].init(pCtx, pCtx->resultInfo);
} }
} }
...@@ -933,6 +978,14 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) { ...@@ -933,6 +978,14 @@ SSDataBlock* doGlobalAggregate(void* param, bool* newgroup) {
continue; continue;
} }
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pAggInfo->udfInfo, -1 * functionId - 1);
doInvokeUdf(pUdfInfo, &pAggInfo->binfo.pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
continue;
}
aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]); aAggs[functionId].xFinalize(&pAggInfo->binfo.pCtx[j]);
} }
......
此差异已折叠。
此差异已折叠。
...@@ -566,7 +566,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) { ...@@ -566,7 +566,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if ((pQueryInfo == NULL) || pQueryInfo->globalMerge) {
return true; return true;
} }
...@@ -679,7 +679,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) { ...@@ -679,7 +679,7 @@ static void tscKillSTableQuery(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (!pQueryInfo->globalMerge) {
return; return;
} }
...@@ -730,7 +730,7 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -730,7 +730,7 @@ void taos_stop_query(TAOS_RES *res) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd); SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) { if (pQueryInfo->globalMerge) {
assert(pSql->rpcRid <= 0); assert(pSql->rpcRid <= 0);
tscKillSTableQuery(pSql); tscKillSTableQuery(pSql);
} else { } else {
...@@ -945,30 +945,35 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -945,30 +945,35 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return TSDB_CODE_TSC_DISCONNECTED; return TSDB_CODE_TSC_DISCONNECTED;
} }
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->signature = pSql;
pSql->fp = NULL; // todo set the correct callback function pointer
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t length = (int32_t)strlen(tableNameList); int32_t length = (int32_t)strlen(tableNameList);
if (length == 0) {
return TSDB_CODE_SUCCESS;
}
if (length > MAX_TABLE_NAME_LENGTH) { if (length > MAX_TABLE_NAME_LENGTH) {
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, length, MAX_TABLE_NAME_LENGTH); tscError("tableNameList too long, length:%d, maximum allowed:%d", length, MAX_TABLE_NAME_LENGTH);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
char *str = calloc(1, length + 1); char *str = calloc(1, length + 1);
if (str == NULL) { if (str == NULL) {
tscError("0x%"PRIx64" failed to allocate sql string buffer", pSql->self); tscError("failed to allocate sql string buffer, size:%d", length);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
strtolower(str, tableNameList); strtolower(str, tableNameList);
SArray* plist = taosArrayInit(4, POINTER_BYTES); SArray* plist = taosArrayInit(4, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(4, POINTER_BYTES); SArray* vgroupList = taosArrayInit(4, POINTER_BYTES);
if (plist == NULL || vgroupList == NULL) {
tfree(str);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
tscAllocPayload(&pSql->cmd, 1024);
pSql->pTscObj = taos;
pSql->signature = pSql;
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist); int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str); free(str);
...@@ -978,10 +983,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) { ...@@ -978,10 +983,11 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return code; return code;
} }
pSql->cmd.pTableMetaMap = taosHashInit(taosArrayGetSize(plist), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
registerSqlObj(pSql); registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj); tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback); code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, NULL, loadMultiTableMetaCallback, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
} }
......
...@@ -23,6 +23,7 @@ ...@@ -23,6 +23,7 @@
#include "tscSubquery.h" #include "tscSubquery.h"
#include "qTableMeta.h" #include "qTableMeta.h"
#include "tsclient.h" #include "tsclient.h"
#include "qUdf.h"
#include "qUtil.h" #include "qUtil.h"
#include "qPlan.h" #include "qPlan.h"
...@@ -32,7 +33,7 @@ typedef struct SInsertSupporter { ...@@ -32,7 +33,7 @@ typedef struct SInsertSupporter {
} SInsertSupporter; } SInsertSupporter;
static void freeJoinSubqueryObj(SSqlObj* pSql); static void freeJoinSubqueryObj(SSqlObj* pSql);
static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql); //static bool tscHasRemainDataInSubqueryResultSet(SSqlObj *pSql);
static int32_t tsCompare(int32_t order, int64_t left, int64_t right) { static int32_t tsCompare(int32_t order, int64_t left, int64_t right) {
if (left == right) { if (left == right) {
...@@ -107,6 +108,9 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) { ...@@ -107,6 +108,9 @@ bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx) {
subState->states[idx] = 1; subState->states[idx] = 1;
bool done = allSubqueryDone(pParentSql); bool done = allSubqueryDone(pParentSql);
if (!done) {
tscDebug("0x%"PRIx64" sub:%p,%d completed, total:%d", pParentSql->self, pSql, idx, pParentSql->subState.numOfSub);
}
pthread_mutex_unlock(&subState->mutex); pthread_mutex_unlock(&subState->mutex);
return done; return done;
} }
...@@ -416,7 +420,9 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { ...@@ -416,7 +420,9 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
} }
// tscFieldInfoClear(&pSupporter->fieldsInfo); // tscFieldInfoClear(&pSupporter->fieldsInfo);
if (pSupporter->fieldsInfo.internalField != NULL) {
taosArrayDestroy(pSupporter->fieldsInfo.internalField);
}
if (pSupporter->pTSBuf != NULL) { if (pSupporter->pTSBuf != NULL) {
tsBufDestroy(pSupporter->pTSBuf); tsBufDestroy(pSupporter->pTSBuf);
pSupporter->pTSBuf = NULL; pSupporter->pTSBuf = NULL;
...@@ -430,7 +436,8 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) { ...@@ -430,7 +436,8 @@ static void tscDestroyJoinSupporter(SJoinSupporter* pSupporter) {
} }
if (pSupporter->pVgroupTables != NULL) { if (pSupporter->pVgroupTables != NULL) {
taosArrayDestroy(pSupporter->pVgroupTables); //taosArrayDestroy(pSupporter->pVgroupTables);
tscFreeVgroupTableInfo(pSupporter->pVgroupTables);
pSupporter->pVgroupTables = NULL; pSupporter->pVgroupTables = NULL;
} }
...@@ -889,7 +896,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar ...@@ -889,7 +896,9 @@ static int32_t getIntersectionOfTableTuple(SQueryInfo* pQueryInfo, SSqlObj* pPar
tscDebug("Join %d - num:%d", i, p->num); tscDebug("Join %d - num:%d", i, p->num);
// sort according to the tag valu // sort according to the tag valu
if (p->pIdTagList != NULL) {
qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar); qsort(p->pIdTagList, p->num, p->tagSize, tagValCompar);
}
if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) { if (!checkForDuplicateTagVal(pColSchema, p, pParentSql)) {
for (int32_t j = 0; j <= i; j++) { for (int32_t j = 0; j <= i; j++) {
...@@ -1173,7 +1182,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow ...@@ -1173,7 +1182,7 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
// no data exists in next vnode, mark the <tid, tags> query completed // no data exists in next vnode, mark the <tid, tags> query completed
// only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets. // only when there is no subquery exits any more, proceeds to get the intersect of the <tid, tags> tuple sets.
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub); //tscDebug("0x%"PRIx64" tagRetrieve:%p,%d completed, total:%d", pParentSql->self, tres, pSupporter->subqueryIndex, pParentSql->subState.numOfSub);
return; return;
} }
...@@ -1441,7 +1450,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR ...@@ -1441,7 +1450,7 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
} }
if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) { if (!subAndCheckDone(pSql, pParentSql, pSupporter->subqueryIndex)) {
tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub); //tscDebug("0x%"PRIx64" sub:0x%"PRIx64",%d completed, total:%d", pParentSql->self, pSql->self, pSupporter->subqueryIndex, pState->numOfSub);
return; return;
} }
...@@ -1888,7 +1897,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter ...@@ -1888,7 +1897,7 @@ int32_t tscCreateJoinSubquery(SSqlObj *pSql, int16_t tableIndex, SJoinSupporter
int16_t type = 0; int16_t type = 0;
int32_t inter = 0; int32_t inter = 0;
getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0); getResultDataInfo(s->type, s->bytes, TSDB_FUNC_TID_TAG, 0, &type, &bytes, &inter, 0, 0, NULL);
SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes}; SSchema s1 = {.colId = s->colId, .type = (uint8_t)type, .bytes = bytes};
pSupporter->tagSize = s1.bytes; pSupporter->tagSize = s1.bytes;
...@@ -2287,6 +2296,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) { ...@@ -2287,6 +2296,7 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
SArray* pColList = pNewQueryInfo->colList; SArray* pColList = pNewQueryInfo->colList;
pNewQueryInfo->colList = NULL; pNewQueryInfo->colList = NULL;
pNewQueryInfo->fillType = TSDB_FILL_NONE;
tscClearSubqueryInfo(pCmd); tscClearSubqueryInfo(pCmd);
tscFreeSqlResult(pSql); tscFreeSqlResult(pSql);
...@@ -2793,6 +2803,28 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p ...@@ -2793,6 +2803,28 @@ static void tscAllDataRetrievedFromDnode(SRetrieveSupport *trsupport, SSqlObj* p
tscFreeRetrieveSup(pSql); tscFreeRetrieveSup(pSql);
// set the command flag must be after the semaphore been correctly set.
if (pParentSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
pParentSql->cmd.command = TSDB_SQL_RETRIEVE_GLOBALMERGE;
SQueryInfo *pQueryInfo2 = tscGetQueryInfo(&pParentSql->cmd);
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t j = 0; j < size; ++j) {
SExprInfo* pExprInfo = tscExprGet(pQueryInfo2, j);
int32_t functionId = pExprInfo->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo2->pUdfInfo, -1 * functionId - 1);
code = initUdfInfo(pUdfInfo);
if (code != TSDB_CODE_SUCCESS) {
pParentSql->res.code = code;
tscAsyncResultOnError(pParentSql);
}
}
}
}
if (pParentSql->res.code == TSDB_CODE_SUCCESS) { if (pParentSql->res.code == TSDB_CODE_SUCCESS) {
(*pParentSql->fp)(pParentSql->param, pParentSql, 0); (*pParentSql->fp)(pParentSql->param, pParentSql, 0);
} else { } else {
...@@ -3049,7 +3081,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows) ...@@ -3049,7 +3081,8 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
} }
if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) { if (!subAndCheckDone(tres, pParentObj, pSupporter->index)) {
tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, pSupporter->index, pParentObj->subState.numOfSub); // concurrency problem, other thread already release pParentObj
//tscDebug("0x%"PRIx64" insert:%p,%d completed, total:%d", pParentObj->self, tres, suppIdx, pParentObj->subState.numOfSub);
return; return;
} }
......
...@@ -28,6 +28,7 @@ ...@@ -28,6 +28,7 @@
#include "tconfig.h" #include "tconfig.h"
#include "ttimezone.h" #include "ttimezone.h"
#include "tlocale.h" #include "tlocale.h"
#include "qScript.h"
// global, not configurable // global, not configurable
#define TSC_VAR_NOT_RELEASE 1 #define TSC_VAR_NOT_RELEASE 1
...@@ -148,6 +149,8 @@ void taos_init_imp(void) { ...@@ -148,6 +149,8 @@ void taos_init_imp(void) {
taosInitNotes(); taosInitNotes();
rpcInit(); rpcInit();
scriptEnvPoolInit();
tscDebug("starting to initialize TAOS client ..."); tscDebug("starting to initialize TAOS client ...");
tscDebug("Local End Point is:%s", tsLocalEp); tscDebug("Local End Point is:%s", tsLocalEp);
} }
...@@ -202,7 +205,9 @@ void taos_cleanup(void) { ...@@ -202,7 +205,9 @@ void taos_cleanup(void) {
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) { if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
return; return;
} }
if (tscEmbedded == 0) {
scriptEnvPoolCleanup();
}
taosHashCleanup(tscTableMetaInfo); taosHashCleanup(tscTableMetaInfo);
tscTableMetaInfo = NULL; tscTableMetaInfo = NULL;
......
...@@ -216,6 +216,15 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) { ...@@ -216,6 +216,15 @@ bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex) {
for (int32_t i = 0; i < numOfExprs; ++i) { for (int32_t i = 0; i < numOfExprs; ++i) {
int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId; int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return false;
}
continue;
}
if (functionId != TSDB_FUNC_PRJ && if (functionId != TSDB_FUNC_PRJ &&
functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAGPRJ &&
functionId != TSDB_FUNC_TAG && functionId != TSDB_FUNC_TAG &&
...@@ -266,6 +275,16 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) { ...@@ -266,6 +275,16 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
f != TSDB_FUNC_DERIVATIVE) { f != TSDB_FUNC_DERIVATIVE) {
return false; return false;
} }
if (f < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * f - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return false;
}
continue;
}
} }
return true; return true;
...@@ -297,7 +316,7 @@ bool tscHasColumnFilter(SQueryInfo* pQueryInfo) { ...@@ -297,7 +316,7 @@ bool tscHasColumnFilter(SQueryInfo* pQueryInfo) {
size_t size = taosArrayGetSize(pQueryInfo->colList); size_t size = taosArrayGetSize(pQueryInfo->colList);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
SColumn* pCol = taosArrayGet(pQueryInfo->colList, i); SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
if (pCol->info.flist.numOfFilters > 0) { if (pCol->info.flist.numOfFilters > 0) {
return true; return true;
} }
...@@ -338,6 +357,10 @@ bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo) { ...@@ -338,6 +357,10 @@ bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo) {
} }
} }
if (tscIsProjectionQuery(pQueryInfo)) {
return false;
}
return false; return false;
} }
...@@ -526,6 +549,15 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) { ...@@ -526,6 +549,15 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo) {
} }
int32_t functionId = pExpr->base.functionId; int32_t functionId = pExpr->base.functionId;
if (functionId < 0) {
SUdfInfo* pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
if (pUdfInfo->funcType == TSDB_UDF_TYPE_AGGREGATE) {
return true;
}
continue;
}
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) { if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TS_DUMMY) {
continue; continue;
} }
...@@ -1302,6 +1334,12 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) { ...@@ -1302,6 +1334,12 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
tfree(pUpQueryInfo); tfree(pUpQueryInfo);
} }
if (pQueryInfo->udfCopy) {
pQueryInfo->pUdfInfo = taosArrayDestroy(pQueryInfo->pUdfInfo);
} else {
pQueryInfo->pUdfInfo = tscDestroyUdfArrayList(pQueryInfo->pUdfInfo);
}
freeQueryInfoImpl(pQueryInfo); freeQueryInfoImpl(pQueryInfo);
clearAllTableMetaInfo(pQueryInfo, removeMeta); clearAllTableMetaInfo(pQueryInfo, removeMeta);
...@@ -1535,6 +1573,47 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) { ...@@ -1535,6 +1573,47 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
return NULL; return NULL;
} }
void freeUdfInfo(SUdfInfo* pUdfInfo) {
if (pUdfInfo == NULL) {
return;
}
if (pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY]) {
(*(udfDestroyFunc)pUdfInfo->funcs[TSDB_UDF_FUNC_DESTROY])(&pUdfInfo->init);
}
tfree(pUdfInfo->name);
if (pUdfInfo->path) {
unlink(pUdfInfo->path);
}
tfree(pUdfInfo->path);
tfree(pUdfInfo->content);
taosCloseDll(pUdfInfo->handle);
}
void* tscDestroyUdfArrayList(SArray* pUdfList) {
if (pUdfList == NULL) {
return NULL;
}
size_t size = taosArrayGetSize(pUdfList);
for (int32_t i = 0; i < size; i++) {
SUdfInfo* udf = taosArrayGet(pUdfList, i);
freeUdfInfo(udf);
}
taosArrayDestroy(pUdfList);
return NULL;
}
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) { void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) {
if (pBlockHashTable == NULL) { if (pBlockHashTable == NULL) {
return NULL; return NULL;
...@@ -2985,6 +3064,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -2985,6 +3064,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->tsBuf = NULL; pQueryInfo->tsBuf = NULL;
pQueryInfo->fillType = pSrc->fillType; pQueryInfo->fillType = pSrc->fillType;
pQueryInfo->fillVal = NULL; pQueryInfo->fillVal = NULL;
pQueryInfo->numOfFillVal = 0;;
pQueryInfo->clauseLimit = pSrc->clauseLimit; pQueryInfo->clauseLimit = pSrc->clauseLimit;
pQueryInfo->prjOffset = pSrc->prjOffset; pQueryInfo->prjOffset = pSrc->prjOffset;
pQueryInfo->numOfTables = 0; pQueryInfo->numOfTables = 0;
...@@ -3020,11 +3100,12 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -3020,11 +3100,12 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
} }
if (pSrc->fillType != TSDB_FILL_NONE) { if (pSrc->fillType != TSDB_FILL_NONE) {
pQueryInfo->fillVal = malloc(pSrc->fieldsInfo.numOfOutput * sizeof(int64_t)); pQueryInfo->fillVal = calloc(1, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
if (pQueryInfo->fillVal == NULL) { if (pQueryInfo->fillVal == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY; code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pQueryInfo->numOfFillVal = pSrc->fieldsInfo.numOfOutput;
memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t)); memcpy(pQueryInfo->fillVal, pSrc->fillVal, pSrc->fieldsInfo.numOfOutput * sizeof(int64_t));
} }
...@@ -3056,6 +3137,14 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) { ...@@ -3056,6 +3137,14 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
tscAddTableMetaInfo(pQueryInfo, &p1->name, pMeta, p1->vgroupList, p1->tagColList, p1->pVgroupTables); tscAddTableMetaInfo(pQueryInfo, &p1->name, pMeta, p1->vgroupList, p1->tagColList, p1->pVgroupTables);
} }
SArray *pUdfInfo = NULL;
if (pSrc->pUdfInfo) {
pUdfInfo = taosArrayDup(pSrc->pUdfInfo);
}
pQueryInfo->pUdfInfo = pUdfInfo;
pQueryInfo->udfCopy = true;
_error: _error:
return code; return code;
} }
...@@ -3102,7 +3191,9 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) { ...@@ -3102,7 +3191,9 @@ void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo) {
info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn); info->vgInfo.epAddr[j].fqdn = strdup(pInfo->vgInfo.epAddr[j].fqdn);
} }
if (pInfo->itemList) {
info->itemList = taosArrayDup(pInfo->itemList); info->itemList = taosArrayDup(pInfo->itemList);
}
} }
SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) { SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) {
...@@ -3353,6 +3444,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3353,6 +3444,11 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd); SQueryInfo* pNewQueryInfo = tscGetQueryInfo(pnCmd);
if (pQueryInfo->pUdfInfo) {
pNewQueryInfo->pUdfInfo = taosArrayDup(pQueryInfo->pUdfInfo);
pNewQueryInfo->udfCopy = true;
}
pNewQueryInfo->command = pQueryInfo->command; pNewQueryInfo->command = pQueryInfo->command;
pnCmd->active = pNewQueryInfo; pnCmd->active = pNewQueryInfo;
...@@ -3366,11 +3462,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3366,11 +3462,13 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
pNewQueryInfo->tsBuf = NULL; pNewQueryInfo->tsBuf = NULL;
pNewQueryInfo->fillType = pQueryInfo->fillType; pNewQueryInfo->fillType = pQueryInfo->fillType;
pNewQueryInfo->fillVal = NULL; pNewQueryInfo->fillVal = NULL;
pNewQueryInfo->numOfFillVal = 0;
pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit; pNewQueryInfo->clauseLimit = pQueryInfo->clauseLimit;
pNewQueryInfo->prjOffset = pQueryInfo->prjOffset; pNewQueryInfo->prjOffset = pQueryInfo->prjOffset;
pNewQueryInfo->numOfTables = 0; pNewQueryInfo->numOfTables = 0;
pNewQueryInfo->pTableMetaInfo = NULL; pNewQueryInfo->pTableMetaInfo = NULL;
pNewQueryInfo->bufLen = pQueryInfo->bufLen; pNewQueryInfo->bufLen = pQueryInfo->bufLen;
pNewQueryInfo->buf = malloc(pQueryInfo->bufLen); pNewQueryInfo->buf = malloc(pQueryInfo->bufLen);
if (pNewQueryInfo->buf == NULL) { if (pNewQueryInfo->buf == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
...@@ -3396,11 +3494,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t ...@@ -3396,11 +3494,14 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t
} }
if (pQueryInfo->fillType != TSDB_FILL_NONE) { if (pQueryInfo->fillType != TSDB_FILL_NONE) {
pNewQueryInfo->fillVal = malloc(pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); //just make memory memory sanitizer happy
//refator later
pNewQueryInfo->fillVal = calloc(1, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
if (pNewQueryInfo->fillVal == NULL) { if (pNewQueryInfo->fillVal == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error; goto _error;
} }
pNewQueryInfo->numOfFillVal = pQueryInfo->fieldsInfo.numOfOutput;
memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t)); memcpy(pNewQueryInfo->fillVal, pQueryInfo->fillVal, pQueryInfo->fieldsInfo.numOfOutput * sizeof(int64_t));
} }
...@@ -4210,7 +4311,7 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI ...@@ -4210,7 +4311,7 @@ int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaI
int32_t inter = 0; int32_t inter = 0;
getResultDataInfo(pSource->base.colType, pSource->base.colBytes, functionId, 0, &pse->resType, getResultDataInfo(pSource->base.colType, pSource->base.colBytes, functionId, 0, &pse->resType,
&pse->resBytes, &inter, 0, false); &pse->resBytes, &inter, 0, false, NULL);
pse->colType = pse->resType; pse->colType = pse->resType;
pse->colBytes = pse->resBytes; pse->colBytes = pse->resBytes;
...@@ -4277,8 +4378,14 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu ...@@ -4277,8 +4378,14 @@ static int32_t createGlobalAggregateExpr(SQueryAttr* pQueryAttr, SQueryInfo* pQu
functionId = TSDB_FUNC_STDDEV; functionId = TSDB_FUNC_STDDEV;
} }
SUdfInfo* pUdfInfo = NULL;
if (functionId < 0) {
pUdfInfo = taosArrayGet(pQueryInfo->pUdfInfo, -1 * functionId - 1);
}
getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, &pse->resBytes, &inter, getResultDataInfo(pExpr->base.colType, pExpr->base.colBytes, functionId, 0, &pse->resType, &pse->resBytes, &inter,
0, false); 0, false, pUdfInfo);
} }
} }
...@@ -4354,6 +4461,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4354,6 +4461,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
pQueryAttr->order = pQueryInfo->order; pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType; pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->havingNum = pQueryInfo->havingFieldNum; pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
pQueryAttr->pUdfInfo = pQueryInfo->pUdfInfo;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window; pQueryAttr->window = pQueryInfo->window;
...@@ -4420,7 +4528,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt ...@@ -4420,7 +4528,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
if (pQueryAttr->fillType != TSDB_FILL_NONE) { if (pQueryAttr->fillType != TSDB_FILL_NONE) {
pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t)); pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t));
memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryAttr->numOfOutput * sizeof(int64_t)); memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryInfo->numOfFillVal * sizeof(int64_t));
} }
pQueryAttr->srcRowSize = 0; pQueryAttr->srcRowSize = 0;
...@@ -4466,8 +4574,13 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq ...@@ -4466,8 +4574,13 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq
strncpy(tablename, *str, TSDB_TABLE_FNAME_LEN); strncpy(tablename, *str, TSDB_TABLE_FNAME_LEN);
len = (int32_t) strlen(tablename); len = (int32_t) strlen(tablename);
} else { } else {
memcpy(tablename, *str, nextStr - (*str));
len = (int32_t)(nextStr - (*str)); len = (int32_t)(nextStr - (*str));
if (len >= TSDB_TABLE_NAME_LEN) {
sprintf(pCmd->payload, "table name too long");
return TSDB_CODE_TSC_INVALID_OPERATION;
}
memcpy(tablename, *str, nextStr - (*str));
tablename[len] = '\0'; tablename[len] = '\0';
} }
...@@ -4479,9 +4592,8 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq ...@@ -4479,9 +4592,8 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq
// Check if the table name available or not // Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) { if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "table name is invalid"); sprintf(pCmd->payload, "table name is invalid");
return code; return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
} }
SName name = {0}; SName name = {0};
...@@ -4497,6 +4609,15 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq ...@@ -4497,6 +4609,15 @@ static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSq
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t nameComparFn(const void* n1, const void* n2) {
int32_t ret = strcmp(*(char**)n1, *(char**)n2);
if (ret == 0) {
return 0;
} else {
return ret > 0? 1:-1;
}
}
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray) { int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray) {
SSqlCmd *pCmd = &pSql->cmd; SSqlCmd *pCmd = &pSql->cmd;
...@@ -4532,12 +4653,44 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt ...@@ -4532,12 +4653,44 @@ int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t lengt
} }
} }
if (taosArrayGetSize(pNameArray) > TSDB_MULTI_TABLEMETA_MAX_NUM) { size_t len = taosArrayGetSize(pNameArray);
if (len == 1) {
return TSDB_CODE_SUCCESS;
}
if (len > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number"); sprintf(pCmd->payload, "tables over the max number");
return code; return code;
} }
taosArraySort(pNameArray, nameComparFn);
int32_t pos = 0;
for(int32_t i = 1; i < len; ++i) {
char** p1 = taosArrayGet(pNameArray, pos);
char** p2 = taosArrayGet(pNameArray, i);
if (strcmp(*p1, *p2) == 0) {
// do nothing
} else {
if (pos + 1 != i) {
char* p = taosArrayGetP(pNameArray, pos + 1);
tfree(p);
taosArraySet(pNameArray, pos + 1, p2);
pos += 1;
} else {
pos += 1;
}
}
}
for(int32_t i = pos + 1; i < pNameArray->size; ++i) {
char* p = taosArrayGetP(pNameArray, i);
tfree(p);
}
pNameArray->size = pos + 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -41,8 +41,10 @@ enum { ...@@ -41,8 +41,10 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MGMT, "mgmt" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_DB, "create-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_TABLE, "create-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_FUNCTION, "create-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_DB, "drop-db" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_TABLE, "drop-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_FUNCTION, "drop-function" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_ACCT, "create-acct" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_ACCT, "create-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_USER, "create-user" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_USER, "create-user" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_ACCT, "drop-acct" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_ACCT, "drop-acct" )
...@@ -74,6 +76,7 @@ enum { ...@@ -74,6 +76,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_STABLEVGROUP, "stable-vgroup" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_STABLEVGROUP, "stable-vgroup" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MULTI_META, "multi-meta" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_MULTI_META, "multi-meta" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_HB, "heart-beat" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_HB, "heart-beat" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_RETRIEVE_FUNC, "retrieve-function" )
// SQL below for client local // SQL below for client local
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" ) TSDB_DEFINE_SQL_TYPE( TSDB_SQL_LOCAL, "local" )
......
...@@ -31,6 +31,14 @@ extern "C" { ...@@ -31,6 +31,14 @@ extern "C" {
memcpy(varDataVal(x), (str), __len); \ memcpy(varDataVal(x), (str), __len); \
} while (0); } while (0);
#define STR_TO_NET_VARSTR(x, str) \
do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \
*(VarDataLenT *)(x) = htons(__len); \
memcpy(varDataVal(x), (str), __len); \
} while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \ do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \ char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \
......
...@@ -18,7 +18,58 @@ ...@@ -18,7 +18,58 @@
#include "ttype.h" #include "ttype.h"
#include "tutil.h" #include "tutil.h"
#include "tarithoperator.h" #include "tarithoperator.h"
#include "tcompare.h"
//GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i]));
#define ARRAY_LIST_OP_DIV(left, right, _left_type, _right_type, len1, len2, out, op, _res_type, _ord) \
{ \
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : MAX(len1, len2) - 1; \
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; \
\
if ((len1) == (len2)) { \
for (; i < (len2) && i >= 0; i += step, (out) += 1) { \
if (isNull((char *)&((left)[i]), _left_type) || isNull((char *)&((right)[i]), _right_type)) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] op(right)[i]; \
} \
} else if ((len1) == 1) { \
for (; i >= 0 && i < (len2); i += step, (out) += 1) { \
if (isNull((char *)(left), _left_type) || isNull((char *)&(right)[i], _right_type)) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[0] op(right)[i]; \
} \
} else if ((len2) == 1) { \
for (; i >= 0 && i < (len1); i += step, (out) += 1) { \
if (isNull((char *)&(left)[i], _left_type) || isNull((char *)(right), _right_type)) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[0])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] op(right)[0]; \
} \
} \
}
#define ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, op, _res_type, _ord) \ #define ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, op, _res_type, _ord) \
{ \ { \
int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : MAX(len1, len2) - 1; \ int32_t i = ((_ord) == TSDB_ORDER_ASC) ? 0 : MAX(len1, len2) - 1; \
...@@ -62,6 +113,12 @@ ...@@ -62,6 +113,12 @@
SET_DOUBLE_NULL(out); \ SET_DOUBLE_NULL(out); \
continue; \ continue; \
} \ } \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[i])) * (right)[i]; \ *(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[i])) * (right)[i]; \
} \ } \
} else if (len1 == 1) { \ } else if (len1 == 1) { \
...@@ -70,6 +127,12 @@ ...@@ -70,6 +127,12 @@
SET_DOUBLE_NULL(out); \ SET_DOUBLE_NULL(out); \
continue; \ continue; \
} \ } \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[i])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[0] - ((int64_t)(((double)(left)[0]) / (right)[i])) * (right)[i]; \ *(out) = (double)(left)[0] - ((int64_t)(((double)(left)[0]) / (right)[i])) * (right)[i]; \
} \ } \
} else if ((len2) == 1) { \ } else if ((len2) == 1) { \
...@@ -78,6 +141,12 @@ ...@@ -78,6 +141,12 @@
SET_DOUBLE_NULL(out); \ SET_DOUBLE_NULL(out); \
continue; \ continue; \
} \ } \
double v, z = 0.0; \
GET_TYPED_DATA(v, double, _right_type, (char *)&((right)[0])); \
if (getComparFunc(TSDB_DATA_TYPE_DOUBLE, 0)(&v, &z) == 0) { \
SET_DOUBLE_NULL(out); \
continue; \
} \
*(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[0])) * (right)[0]; \ *(out) = (double)(left)[i] - ((int64_t)(((double)(left)[i]) / (right)[0])) * (right)[0]; \
} \ } \
} \ } \
...@@ -90,7 +159,7 @@ ...@@ -90,7 +159,7 @@
#define ARRAY_LIST_MULTI(left, right, _left_type, _right_type, len1, len2, out, _ord) \ #define ARRAY_LIST_MULTI(left, right, _left_type, _right_type, len1, len2, out, _ord) \
ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, *, TSDB_DATA_TYPE_DOUBLE, _ord) ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, *, TSDB_DATA_TYPE_DOUBLE, _ord)
#define ARRAY_LIST_DIV(left, right, _left_type, _right_type, len1, len2, out, _ord) \ #define ARRAY_LIST_DIV(left, right, _left_type, _right_type, len1, len2, out, _ord) \
ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, /, TSDB_DATA_TYPE_DOUBLE, _ord) ARRAY_LIST_OP_DIV(left, right, _left_type, _right_type, len1, len2, out, /, TSDB_DATA_TYPE_DOUBLE, _ord)
#define ARRAY_LIST_REM(left, right, _left_type, _right_type, len1, len2, out, _ord) \ #define ARRAY_LIST_REM(left, right, _left_type, _right_type, len1, len2, out, _ord) \
ARRAY_LIST_OP_REM(left, right, _left_type, _right_type, len1, len2, out, %, TSDB_DATA_TYPE_DOUBLE, _ord) ARRAY_LIST_OP_REM(left, right, _left_type, _right_type, len1, len2, out, %, TSDB_DATA_TYPE_DOUBLE, _ord)
......
此差异已折叠。
...@@ -2,6 +2,7 @@ package com.taosdata.jdbc; ...@@ -2,6 +2,7 @@ package com.taosdata.jdbc;
import java.sql.*; import java.sql.*;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
public abstract class AbstractStatement extends WrapperImpl implements Statement { public abstract class AbstractStatement extends WrapperImpl implements Statement {
...@@ -196,14 +197,45 @@ public abstract class AbstractStatement extends WrapperImpl implements Statement ...@@ -196,14 +197,45 @@ public abstract class AbstractStatement extends WrapperImpl implements Statement
if (batchedArgs == null || batchedArgs.isEmpty()) if (batchedArgs == null || batchedArgs.isEmpty())
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_BATCH_IS_EMPTY); throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_BATCH_IS_EMPTY);
String clientInfo = getConnection().getClientInfo(TSDBDriver.PROPERTY_KEY_BATCH_ERROR_IGNORE);
boolean batchErrorIgnore = clientInfo == null ? TSDBConstants.DEFAULT_BATCH_ERROR_IGNORE : Boolean.parseBoolean(clientInfo);
if (batchErrorIgnore) {
return executeBatchIgnoreException();
}
return executeBatchThrowException();
}
private int[] executeBatchIgnoreException() {
return batchedArgs.stream().mapToInt(sql -> {
try {
boolean isSelect = execute(sql);
if (isSelect) {
return SUCCESS_NO_INFO;
} else {
return getUpdateCount();
}
} catch (SQLException e) {
return EXECUTE_FAILED;
}
}).toArray();
}
private int[] executeBatchThrowException() throws BatchUpdateException {
int[] res = new int[batchedArgs.size()]; int[] res = new int[batchedArgs.size()];
for (int i = 0; i < batchedArgs.size(); i++) { for (int i = 0; i < batchedArgs.size(); i++) {
try {
boolean isSelect = execute(batchedArgs.get(i)); boolean isSelect = execute(batchedArgs.get(i));
if (isSelect) { if (isSelect) {
res[i] = SUCCESS_NO_INFO; res[i] = SUCCESS_NO_INFO;
} else { } else {
res[i] = getUpdateCount(); res[i] = getUpdateCount();
} }
} catch (SQLException e) {
String reason = e.getMessage();
int[] updateCounts = Arrays.copyOfRange(res, 0, i);
throw new BatchUpdateException(reason, updateCounts, e);
}
} }
return res; return res;
} }
......
...@@ -74,6 +74,8 @@ public abstract class TSDBConstants { ...@@ -74,6 +74,8 @@ public abstract class TSDBConstants {
public static final String DEFAULT_PRECISION = "ms"; public static final String DEFAULT_PRECISION = "ms";
public static final boolean DEFAULT_BATCH_ERROR_IGNORE = false;
public static int typeName2JdbcType(String type) { public static int typeName2JdbcType(String type) {
switch (type.toUpperCase()) { switch (type.toUpperCase()) {
case "TIMESTAMP": case "TIMESTAMP":
......
...@@ -100,6 +100,11 @@ public class TSDBDriver extends AbstractDriver { ...@@ -100,6 +100,11 @@ public class TSDBDriver extends AbstractDriver {
*/ */
public static final String PROPERTY_KEY_TIMESTAMP_FORMAT = "timestampFormat"; public static final String PROPERTY_KEY_TIMESTAMP_FORMAT = "timestampFormat";
/**
* continue process commands in executeBatch
*/
public static final String PROPERTY_KEY_BATCH_ERROR_IGNORE = "batchErrorIgnore";
private TSDBDatabaseMetaData dbMetaData = null; private TSDBDatabaseMetaData dbMetaData = null;
static { static {
......
...@@ -19,6 +19,7 @@ import com.taosdata.jdbc.utils.NullType; ...@@ -19,6 +19,7 @@ import com.taosdata.jdbc.utils.NullType;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Timestamp; import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
...@@ -463,6 +464,25 @@ public class TSDBResultSetRowData { ...@@ -463,6 +464,25 @@ public class TSDBResultSetRowData {
data.set(col, tsObj); data.set(col, tsObj);
} }
/**
* this implementation is used for TDengine old version
*/
public void setTimestamp(int col, long ts) {
//TODO: this implementation contains logical error
// when precision is us the (long ts) is 16 digital number
// when precision is ms, the (long ts) is 13 digital number
// we need a JNI function like this:
// public void setTimestamp(int col, long epochSecond, long nanoAdjustment)
if (ts < 1_0000_0000_0000_0L) {
data.set(col, new Timestamp(ts));
} else {
long epochSec = ts / 1000_000L;
long nanoAdjustment = ts % 1000_000L * 1000L;
Timestamp timestamp = Timestamp.from(Instant.ofEpochSecond(epochSec, nanoAdjustment));
data.set(col, timestamp);
}
}
public Timestamp getTimestamp(int col, int nativeType) { public Timestamp getTimestamp(int col, int nativeType) {
Object obj = data.get(col - 1); Object obj = data.get(col - 1);
if (obj == null) if (obj == null)
......
...@@ -5,7 +5,6 @@ import com.google.common.collect.RangeSet; ...@@ -5,7 +5,6 @@ import com.google.common.collect.RangeSet;
import com.google.common.collect.TreeRangeSet; import com.google.common.collect.TreeRangeSet;
import com.taosdata.jdbc.enums.TimestampPrecision; import com.taosdata.jdbc.enums.TimestampPrecision;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets; import java.nio.charset.StandardCharsets;
import java.sql.Date; import java.sql.Date;
import java.sql.Time; import java.sql.Time;
...@@ -110,15 +109,27 @@ public class Utils { ...@@ -110,15 +109,27 @@ public class Utils {
return rawSql; return rawSql;
// toLowerCase // toLowerCase
String preparedSql = rawSql.trim().toLowerCase(); String preparedSql = rawSql.trim().toLowerCase();
String[] clause = new String[]{"values\\s*\\([\\s\\S]*?\\)", "tags\\s*\\([\\s\\S]*?\\)", "where[\\s\\S]*"}; String[] clause = new String[]{"tags\\s*\\([\\s\\S]*?\\)", "where[\\s\\S]*"};
Map<Integer, Integer> placeholderPositions = new HashMap<>(); Map<Integer, Integer> placeholderPositions = new HashMap<>();
RangeSet<Integer> clauseRangeSet = TreeRangeSet.create(); RangeSet<Integer> clauseRangeSet = TreeRangeSet.create();
findPlaceholderPosition(preparedSql, placeholderPositions); findPlaceholderPosition(preparedSql, placeholderPositions);
// find tags and where clause's position
findClauseRangeSet(preparedSql, clause, clauseRangeSet); findClauseRangeSet(preparedSql, clause, clauseRangeSet);
// find values clause's position
findValuesClauseRangeSet(preparedSql, clauseRangeSet);
return transformSql(rawSql, parameters, placeholderPositions, clauseRangeSet); return transformSql(rawSql, parameters, placeholderPositions, clauseRangeSet);
} }
private static void findValuesClauseRangeSet(String preparedSql, RangeSet<Integer> clauseRangeSet) {
Matcher matcher = Pattern.compile("(values|,)\\s*(\\([^)]*\\))").matcher(preparedSql);
while (matcher.find()) {
int start = matcher.start(2);
int end = matcher.end(2);
clauseRangeSet.add(Range.closedOpen(start, end));
}
}
private static void findClauseRangeSet(String preparedSql, String[] regexArr, RangeSet<Integer> clauseRangeSet) { private static void findClauseRangeSet(String preparedSql, String[] regexArr, RangeSet<Integer> clauseRangeSet) {
clauseRangeSet.clear(); clauseRangeSet.clear();
for (String regex : regexArr) { for (String regex : regexArr) {
...@@ -126,7 +137,7 @@ public class Utils { ...@@ -126,7 +137,7 @@ public class Utils {
while (matcher.find()) { while (matcher.find()) {
int start = matcher.start(); int start = matcher.start();
int end = matcher.end(); int end = matcher.end();
clauseRangeSet.add(Range.closed(start, end)); clauseRangeSet.add(Range.closedOpen(start, end));
} }
} }
} }
......
...@@ -841,13 +841,13 @@ public class TSDBPreparedStatementTest { ...@@ -841,13 +841,13 @@ public class TSDBPreparedStatementTest {
} }
@Test @Test
public void setBytes() throws SQLException, IOException { public void setBytes() throws SQLException {
// given // given
long ts = System.currentTimeMillis(); long ts = System.currentTimeMillis();
byte[] f8 = "{\"name\": \"john\", \"age\": 10, \"address\": \"192.168.1.100\"}".getBytes(); byte[] f8 = "{\"name\": \"john\", \"age\": 10, \"address\": \"192.168.1.100\"}".getBytes();
// when // when
pstmt_insert.setTimestamp(1, new Timestamp(System.currentTimeMillis())); pstmt_insert.setTimestamp(1, new Timestamp(ts));
pstmt_insert.setBytes(9, f8); pstmt_insert.setBytes(9, f8);
int result = pstmt_insert.executeUpdate(); int result = pstmt_insert.executeUpdate();
......
package com.taosdata.jdbc.cases;
import org.junit.*;
import java.sql.*;
import java.util.stream.IntStream;
public class BatchErrorIgnoreTest {
private static final String host = "127.0.0.1";
@Test
public void batchErrorThrowException() throws SQLException {
// given
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
// when
try (Statement stmt = conn.createStatement()) {
IntStream.range(1, 6).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("insert into t11 values(now, 11)");
IntStream.range(6, 11).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + "),(now + 1s, " + (10 * i) + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("select count(*) from test.weather");
stmt.executeBatch();
} catch (BatchUpdateException e) {
int[] updateCounts = e.getUpdateCounts();
Assert.assertEquals(5, updateCounts.length);
Assert.assertEquals(1, updateCounts[0]);
Assert.assertEquals(1, updateCounts[1]);
Assert.assertEquals(1, updateCounts[2]);
Assert.assertEquals(1, updateCounts[3]);
Assert.assertEquals(1, updateCounts[4]);
}
}
@Test
public void batchErrorIgnore() throws SQLException {
// given
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata&batchErrorIgnore=true");
// when
int[] results = null;
try (Statement stmt = conn.createStatement()) {
IntStream.range(1, 6).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("insert into t11 values(now, 11)");
IntStream.range(6, 11).mapToObj(i -> "insert into test.t" + i + " values(now, " + i + "),(now + 1s, " + (10 * i) + ")").forEach(sql -> {
try {
stmt.addBatch(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.addBatch("select count(*) from test.weather");
results = stmt.executeBatch();
} catch (SQLException e) {
e.printStackTrace();
}
// then
assert results != null;
Assert.assertEquals(12, results.length);
Assert.assertEquals(1, results[0]);
Assert.assertEquals(1, results[1]);
Assert.assertEquals(1, results[2]);
Assert.assertEquals(1, results[3]);
Assert.assertEquals(1, results[4]);
Assert.assertEquals(Statement.EXECUTE_FAILED, results[5]);
Assert.assertEquals(2, results[6]);
Assert.assertEquals(2, results[7]);
Assert.assertEquals(2, results[8]);
Assert.assertEquals(2, results[9]);
Assert.assertEquals(2, results[10]);
Assert.assertEquals(Statement.SUCCESS_NO_INFO, results[11]);
}
@Before
public void before() {
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("use test");
stmt.execute("drop table if exists weather");
stmt.execute("create table weather (ts timestamp, f1 float) tags(t1 int)");
IntStream.range(1, 11).mapToObj(i -> "create table t" + i + " using weather tags(" + i + ")").forEach(sql -> {
try {
stmt.execute(sql);
} catch (SQLException e) {
e.printStackTrace();
}
});
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@BeforeClass
public static void beforeClass() {
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test");
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
@AfterClass
public static void afterClass() {
try {
Connection conn = DriverManager.getConnection("jdbc:TAOS://" + host + ":6030/?user=root&password=taosdata");
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test");
stmt.close();
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
...@@ -3,6 +3,8 @@ package com.taosdata.jdbc.utils; ...@@ -3,6 +3,8 @@ package com.taosdata.jdbc.utils;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream; import java.util.stream.Stream;
public class UtilsTest { public class UtilsTest {
...@@ -32,7 +34,7 @@ public class UtilsTest { ...@@ -32,7 +34,7 @@ public class UtilsTest {
} }
@Test @Test
public void getNativeSqlReplaceQuestionMarks() { public void lowerCase() {
// given // given
String nativeSql = "insert into ?.? (ts, temperature, humidity) using ?.? tags(?,?) values(now, ?, ?)"; String nativeSql = "insert into ?.? (ts, temperature, humidity) using ?.? tags(?,?) values(now, ?, ?)";
Object[] parameters = Stream.of("test", "t1", "test", "weather", "beijing", 1, 12.2, 4).toArray(); Object[] parameters = Stream.of("test", "t1", "test", "weather", "beijing", 1, 12.2, 4).toArray();
...@@ -46,7 +48,7 @@ public class UtilsTest { ...@@ -46,7 +48,7 @@ public class UtilsTest {
} }
@Test @Test
public void getNativeSqlReplaceQuestionMarks2() { public void upperCase() {
// given // given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?)"; String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 123, 3.14, 220, 4).toArray(); Object[] parameters = Stream.of("d1", 1, 123, 3.14, 220, 4).toArray();
...@@ -59,9 +61,49 @@ public class UtilsTest { ...@@ -59,9 +61,49 @@ public class UtilsTest {
Assert.assertEquals(expected, actual); Assert.assertEquals(expected, actual);
} }
@Test
public void multiValues() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,?,?),(?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,'abc',4),(200,3.1415,'xyz',5)";
Assert.assertEquals(expected, actual);
}
@Test
public void lineTerminator() {
// given
String nativeSql = "INSERT INTO ? (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (?) VALUES (?,?,\r\n?,?),(?,?,?,?)";
Object[] parameters = Stream.of("d1", 1, 100, 3.14, "abc", 4, 200, 3.1415, "xyz", 5).toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT INTO d1 (TS,CURRENT,VOLTAGE,PHASE) USING METERS TAGS (1) VALUES (100,3.14,\r\n'abc',4),(200,3.1415,'xyz',5)";
Assert.assertEquals(expected, actual);
}
@Test
public void lineTerminatorAndMultiValues() {
String nativeSql = "INSERT Into ? TAGS(?) VALUES(?,?,\r\n?,?),(?,? ,\r\n?,?) t? tags (?) Values (?,?,?\r\n,?),(?,?,?,?) t? Tags(?) values (?,?,?,?) , (?,?,?,?)";
Object[] parameters = Stream.of("t1", "abc", 100, 1.1, "xxx", "xxx", 200, 2.2, "xxx", "xxx", 2, "bcd", 300, 3.3, "xxx", "xxx", 400, 4.4, "xxx", "xxx", 3, "cde", 500, 5.5, "xxx", "xxx", 600, 6.6, "xxx", "xxx").toArray();
// when
String actual = Utils.getNativeSql(nativeSql, parameters);
// then
String expected = "INSERT Into t1 TAGS('abc') VALUES(100,1.1,\r\n'xxx','xxx'),(200,2.2 ,\r\n'xxx','xxx') t2 tags ('bcd') Values (300,3.3,'xxx'\r\n,'xxx'),(400,4.4,'xxx','xxx') t3 Tags('cde') values (500,5.5,'xxx','xxx') , (600,6.6,'xxx','xxx')";
Assert.assertEquals(expected, actual);
}
@Test @Test
public void getNativeSqlReplaceNothing() { public void replaceNothing() {
// given // given
String nativeSql = "insert into test.t1 (ts, temperature, humidity) using test.weather tags('beijing',1) values(now, 12.2, 4)"; String nativeSql = "insert into test.t1 (ts, temperature, humidity) using test.weather tags('beijing',1) values(now, 12.2, 4)";
...@@ -73,7 +115,7 @@ public class UtilsTest { ...@@ -73,7 +115,7 @@ public class UtilsTest {
} }
@Test @Test
public void getNativeSqlReplaceNothing2() { public void replaceNothing2() {
// given // given
String nativeSql = "insert into test.t1 (ts, temperature, humidity) using test.weather tags('beijing',1) values(now, 12.2, 4)"; String nativeSql = "insert into test.t1 (ts, temperature, humidity) using test.weather tags('beijing',1) values(now, 12.2, 4)";
Object[] parameters = Stream.of("test", "t1", "test", "weather", "beijing", 1, 12.2, 4).toArray(); Object[] parameters = Stream.of("test", "t1", "test", "weather", "beijing", 1, 12.2, 4).toArray();
...@@ -86,7 +128,7 @@ public class UtilsTest { ...@@ -86,7 +128,7 @@ public class UtilsTest {
} }
@Test @Test
public void getNativeSqlReplaceNothing3() { public void replaceNothing3() {
// given // given
String nativeSql = "insert into ?.? (ts, temperature, humidity) using ?.? tags(?,?) values(now, ?, ?)"; String nativeSql = "insert into ?.? (ts, temperature, humidity) using ?.? tags(?,?) values(now, ?, ?)";
......
...@@ -18,7 +18,7 @@ ELSE () ...@@ -18,7 +18,7 @@ ELSE ()
ENDIF () ENDIF ()
ADD_EXECUTABLE(taosd ${SRC}) ADD_EXECUTABLE(taosd ${SRC})
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync ${LINK_JEMALLOC}) TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lua lz4 balance sync ${LINK_JEMALLOC})
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosd taos_static) TARGET_LINK_LIBRARIES(taosd taos_static)
......
...@@ -40,6 +40,7 @@ ...@@ -40,6 +40,7 @@
#include "dnodeShell.h" #include "dnodeShell.h"
#include "dnodeTelemetry.h" #include "dnodeTelemetry.h"
#include "module.h" #include "module.h"
#include "qScript.h"
#include "mnode.h" #include "mnode.h"
#if !defined(_MODULE) || !defined(_TD_LINUX) #if !defined(_MODULE) || !defined(_TD_LINUX)
...@@ -84,6 +85,7 @@ static SStep tsDnodeSteps[] = { ...@@ -84,6 +85,7 @@ static SStep tsDnodeSteps[] = {
{"dnode-shell", dnodeInitShell, dnodeCleanupShell}, {"dnode-shell", dnodeInitShell, dnodeCleanupShell},
{"dnode-statustmr", dnodeInitStatusTimer,dnodeCleanupStatusTimer}, {"dnode-statustmr", dnodeInitStatusTimer,dnodeCleanupStatusTimer},
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry}, {"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
{"dnode-script", scriptEnvPoolInit, scriptEnvPoolCleanup},
}; };
static SStep tsDnodeCompactSteps[] = { static SStep tsDnodeCompactSteps[] = {
......
...@@ -48,9 +48,11 @@ int32_t dnodeInitShell() { ...@@ -48,9 +48,11 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE]= dnodeDispatchToMWriteQueue;
...@@ -72,6 +74,7 @@ int32_t dnodeInitShell() { ...@@ -72,6 +74,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = dnodeDispatchToMReadQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep; dnodeProcessShellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeSendStartupStep;
......
...@@ -202,12 +202,12 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -202,12 +202,12 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite); taosGetQitem(pWorker->qall, &qtype, (void **)&pWrite);
dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite, dTrace("msg:%p, app:%p type:%s will be processed in vwrite queue, qtype:%s hver:%" PRIu64, pWrite,
pWrite->rpcMsg.ahandle, taosMsg[pWrite->pHead.msgType], qtypeStr[qtype], pWrite->pHead.version); pWrite->rpcMsg.ahandle, taosMsg[pWrite->walHead.msgType], qtypeStr[qtype], pWrite->walHead.version);
pWrite->code = vnodeProcessWrite(pVnode, &pWrite->pHead, qtype, pWrite); pWrite->code = vnodeProcessWrite(pVnode, &pWrite->walHead, qtype, pWrite);
if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1); if (pWrite->code <= 0) atomic_add_fetch_32(&pWrite->processedCount, 1);
if (pWrite->code > 0) pWrite->code = 0; if (pWrite->code > 0) pWrite->code = 0;
if (pWrite->code == 0 && pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true; if (pWrite->code == 0 && pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT) forceFsync = true;
dTrace("msg:%p is processed in vwrite queue, code:0x%x", pWrite, pWrite->code); dTrace("msg:%p is processed in vwrite queue, code:0x%x", pWrite, pWrite->code);
} }
...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) { ...@@ -222,7 +222,7 @@ static void *dnodeProcessVWriteQueue(void *wparam) {
dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code); dnodeSendRpcVWriteRsp(pVnode, pWrite, pWrite->code);
} else { } else {
if (qtype == TAOS_QTYPE_FWD) { if (qtype == TAOS_QTYPE_FWD) {
vnodeConfirmForward(pVnode, pWrite->pHead.version, pWrite->code, pWrite->pHead.msgType != TSDB_MSG_TYPE_SUBMIT); vnodeConfirmForward(pVnode, pWrite->walHead.version, pWrite->code, pWrite->walHead.msgType != TSDB_MSG_TYPE_SUBMIT);
} }
if (pWrite->rspRet.rsp) { if (pWrite->rspRet.rsp) {
rpcFreeCont(pWrite->rspRet.rsp); rpcFreeCont(pWrite->rspRet.rsp);
......
...@@ -28,7 +28,7 @@ typedef void* qinfo_t; ...@@ -28,7 +28,7 @@ typedef void* qinfo_t;
* @param qinfo * @param qinfo
* @return * @return
*/ */
int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t *qId); int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryTableMsg, qinfo_t* qinfo, uint64_t qId);
/** /**
......
...@@ -186,6 +186,10 @@ do { \ ...@@ -186,6 +186,10 @@ do { \
#define TSDB_NODE_NAME_LEN 64 #define TSDB_NODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string #define TSDB_TABLE_NAME_LEN 193 // it is a null-terminated string
#define TSDB_DB_NAME_LEN 33 #define TSDB_DB_NAME_LEN 33
#define TSDB_FUNC_NAME_LEN 65
#define TSDB_FUNC_CODE_LEN (65535 - 512)
#define TSDB_FUNC_BUF_SIZE 512
#define TSDB_TYPE_STR_MAX_LEN 32
#define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN) #define TSDB_TABLE_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_TABLE_NAME_LEN)
#define TSDB_COL_NAME_LEN 65 #define TSDB_COL_NAME_LEN 65
#define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64
...@@ -332,6 +336,10 @@ do { \ ...@@ -332,6 +336,10 @@ do { \
#define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type #define TSDB_QUERY_TYPE_NON_TYPE 0x00u // none type
#define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode #define TSDB_QUERY_TYPE_FREE_RESOURCE 0x01u // free qhandle at vnode
#define TSDB_UDF_TYPE_SCALAR 1
#define TSDB_UDF_TYPE_AGGREGATE 2
/* /*
* 1. ordinary sub query for select * from super_table * 1. ordinary sub query for select * from super_table
* 2. all sqlobj generated by createSubqueryObj with this flag * 2. all sqlobj generated by createSubqueryObj with this flag
......
...@@ -100,6 +100,7 @@ int32_t* taosGetErrno(); ...@@ -100,6 +100,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217) //"Database not specified or available") #define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217) //"Database not specified or available")
#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) //"Table does not exist") #define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) //"Table does not exist")
#define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config") #define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) //"SQL statement too long check maxSQLLength config")
#define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) //"File is empty")
// mnode // mnode
#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed") #define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0300) //"Message not processed")
...@@ -174,6 +175,13 @@ int32_t* taosGetErrno(); ...@@ -174,6 +175,13 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) //"Super table does not exist") #define TSDB_CODE_MND_INVALID_STABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x036D) //"Super table does not exist")
#define TSDB_CODE_MND_INVALID_CREATE_TABLE_MSG TAOS_DEF_ERROR_CODE(0, 0x036E) //"Invalid create table message") #define TSDB_CODE_MND_INVALID_CREATE_TABLE_MSG TAOS_DEF_ERROR_CODE(0, 0x036E) //"Invalid create table message")
#define TSDB_CODE_MND_INVALID_FUNC_NAME TAOS_DEF_ERROR_CODE(0, 0x0370) //"Invalid func name")
#define TSDB_CODE_MND_INVALID_FUNC_LEN TAOS_DEF_ERROR_CODE(0, 0x0371) //"Invalid func length")
#define TSDB_CODE_MND_INVALID_FUNC_CODE TAOS_DEF_ERROR_CODE(0, 0x0372) //"Invalid func code")
#define TSDB_CODE_MND_FUNC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0373) //"Func already exists")
#define TSDB_CODE_MND_INVALID_FUNC TAOS_DEF_ERROR_CODE(0, 0x0374) //"Invalid func")
#define TSDB_CODE_MND_INVALID_FUNC_BUFSIZE TAOS_DEF_ERROR_CODE(0, 0x0375) //"Invalid func bufSize")
#define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) //"Database not specified or available") #define TSDB_CODE_MND_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0380) //"Database not specified or available")
#define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //"Database already exists") #define TSDB_CODE_MND_DB_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0381) //"Database already exists")
#define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0382) //"Invalid database options") #define TSDB_CODE_MND_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x0382) //"Invalid database options")
...@@ -261,6 +269,7 @@ int32_t* taosGetErrno(); ...@@ -261,6 +269,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query") #define TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW TAOS_DEF_ERROR_CODE(0, 0x070A) //"Too many time window in query")
#define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached") #define TSDB_CODE_QRY_NOT_ENOUGH_BUFFER TAOS_DEF_ERROR_CODE(0, 0x070B) //"Query buffer limit has reached")
#define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica") #define TSDB_CODE_QRY_INCONSISTAN TAOS_DEF_ERROR_CODE(0, 0x070C) //"File inconsistency in replica")
#define TSDB_CODE_QRY_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x070D) //"System error")
// grant // grant
...@@ -395,6 +404,8 @@ int32_t* taosGetErrno(); ...@@ -395,6 +404,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find") #define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find")
#define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string") #define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string")
#define TSDB_CODE_HTTP_REQUEST_JSON_ERROR TAOS_DEF_ERROR_CODE(0, 0x1F00) //"http request json error")
// odbc // odbc
#define TSDB_CODE_ODBC_OOM TAOS_DEF_ERROR_CODE(0, 0x2100) //"out of memory") #define TSDB_CODE_ODBC_OOM TAOS_DEF_ERROR_CODE(0, 0x2100) //"out of memory")
#define TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM TAOS_DEF_ERROR_CODE(0, 0x2101) //"convertion not a valid literal input") #define TSDB_CODE_ODBC_CONV_CHAR_NOT_NUM TAOS_DEF_ERROR_CODE(0, 0x2101) //"convertion not a valid literal input")
......
...@@ -77,7 +77,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" ) ...@@ -77,7 +77,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_USER, "drop-user" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DNODE, "create-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DNODE, "drop-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_DB, "create-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_FUNCTION, "create-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_DB, "drop-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_FUNCTION, "drop-function" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_DB, "use-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_DB, "alter-db" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SYNC_DB, "sync-db-replica" )
...@@ -96,7 +98,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" ) ...@@ -96,7 +98,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_STREAM, "kill-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_KILL_CONN, "kill-conn" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONFIG_DNODE, "cm-config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_HEARTBEAT, "heartbeat" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY8, "dummy8" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_RETRIEVE_FUNC, "retrieve-func" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY9, "dummy9" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY10, "dummy10" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY11, "dummy11" )
...@@ -153,6 +155,7 @@ enum _mgmt_table { ...@@ -153,6 +155,7 @@ enum _mgmt_table {
TSDB_MGMT_TABLE_STREAMTABLES, TSDB_MGMT_TABLE_STREAMTABLES,
TSDB_MGMT_TABLE_CLUSTER, TSDB_MGMT_TABLE_CLUSTER,
TSDB_MGMT_TABLE_TP, TSDB_MGMT_TABLE_TP,
TSDB_MGMT_TABLE_FUNCTION,
TSDB_MGMT_TABLE_MAX, TSDB_MGMT_TABLE_MAX,
}; };
...@@ -507,6 +510,9 @@ typedef struct { ...@@ -507,6 +510,9 @@ typedef struct {
int32_t prevResultLen; // previous result length int32_t prevResultLen; // previous result length
int32_t numOfOperator; int32_t numOfOperator;
int32_t tableScanOperator;// table scan operator. -1 means no scan operator int32_t tableScanOperator;// table scan operator. -1 means no scan operator
int32_t udfNum; // number of udf function
int32_t udfContentOffset;
int32_t udfContentLen;
SColumnInfo tableCols[]; SColumnInfo tableCols[];
} SQueryTableMsg; } SQueryTableMsg;
...@@ -571,6 +577,41 @@ typedef struct { ...@@ -571,6 +577,41 @@ typedef struct {
int8_t reserve[5]; int8_t reserve[5];
} SCreateDbMsg, SAlterDbMsg; } SCreateDbMsg, SAlterDbMsg;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
char path[PATH_MAX];
int32_t funcType;
uint8_t outputType;
int16_t outputLen;
int32_t bufSize;
int32_t codeLen;
char code[];
} SCreateFuncMsg;
typedef struct {
int32_t num;
char name[];
} SRetrieveFuncMsg;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
int32_t funcType;
int8_t resType;
int16_t resBytes;
int32_t bufSize;
int32_t len;
char content[];
} SFunctionInfoMsg;
typedef struct {
int32_t num;
char content[];
} SUdfFuncMsg;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
} SDropFuncMsg;
typedef struct { typedef struct {
char db[TSDB_TABLE_FNAME_LEN]; char db[TSDB_TABLE_FNAME_LEN];
uint8_t ignoreNotExists; uint8_t ignoreNotExists;
...@@ -710,8 +751,10 @@ typedef struct { ...@@ -710,8 +751,10 @@ typedef struct {
} STableInfoMsg; } STableInfoMsg;
typedef struct { typedef struct {
uint8_t metaClone; // create local clone of the cached table meta
int32_t numOfVgroups; int32_t numOfVgroups;
int32_t numOfTables; int32_t numOfTables;
int32_t numOfUdfs;
char tableNames[]; char tableNames[];
} SMultiTableInfoMsg; } SMultiTableInfoMsg;
...@@ -762,7 +805,11 @@ typedef struct STableMetaMsg { ...@@ -762,7 +805,11 @@ typedef struct STableMetaMsg {
typedef struct SMultiTableMeta { typedef struct SMultiTableMeta {
int32_t numOfTables; int32_t numOfTables;
int32_t numOfVgroup; int32_t numOfVgroup;
int32_t numOfUdf;
int32_t contLen; int32_t contLen;
uint8_t compressed; // denote if compressed or not
uint32_t rawLen; // size before compress
uint8_t metaClone; // make meta clone after retrieve meta from mnode
char meta[]; char meta[];
} SMultiTableMeta; } SMultiTableMeta;
......
...@@ -27,7 +27,7 @@ typedef struct { ...@@ -27,7 +27,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN]; char pass[TSDB_KEY_LEN];
char db[TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
FCqWrite cqWrite; FCqWrite cqWrite;
} SCqCfg; } SCqCfg;
......
...@@ -62,149 +62,154 @@ ...@@ -62,149 +62,154 @@
#define TK_SHOW 44 #define TK_SHOW 44
#define TK_DATABASES 45 #define TK_DATABASES 45
#define TK_TOPICS 46 #define TK_TOPICS 46
#define TK_MNODES 47 #define TK_FUNCTIONS 47
#define TK_DNODES 48 #define TK_MNODES 48
#define TK_ACCOUNTS 49 #define TK_DNODES 49
#define TK_USERS 50 #define TK_ACCOUNTS 50
#define TK_MODULES 51 #define TK_USERS 51
#define TK_QUERIES 52 #define TK_MODULES 52
#define TK_CONNECTIONS 53 #define TK_QUERIES 53
#define TK_STREAMS 54 #define TK_CONNECTIONS 54
#define TK_VARIABLES 55 #define TK_STREAMS 55
#define TK_SCORES 56 #define TK_VARIABLES 56
#define TK_GRANTS 57 #define TK_SCORES 57
#define TK_VNODES 58 #define TK_GRANTS 58
#define TK_IPTOKEN 59 #define TK_VNODES 59
#define TK_DOT 60 #define TK_IPTOKEN 60
#define TK_CREATE 61 #define TK_DOT 61
#define TK_TABLE 62 #define TK_CREATE 62
#define TK_STABLE 63 #define TK_TABLE 63
#define TK_DATABASE 64 #define TK_STABLE 64
#define TK_TABLES 65 #define TK_DATABASE 65
#define TK_STABLES 66 #define TK_TABLES 66
#define TK_VGROUPS 67 #define TK_STABLES 67
#define TK_DROP 68 #define TK_VGROUPS 68
#define TK_TOPIC 69 #define TK_DROP 69
#define TK_DNODE 70 #define TK_TOPIC 70
#define TK_USER 71 #define TK_FUNCTION 71
#define TK_ACCOUNT 72 #define TK_DNODE 72
#define TK_USE 73 #define TK_USER 73
#define TK_DESCRIBE 74 #define TK_ACCOUNT 74
#define TK_ALTER 75 #define TK_USE 75
#define TK_PASS 76 #define TK_DESCRIBE 76
#define TK_PRIVILEGE 77 #define TK_ALTER 77
#define TK_LOCAL 78 #define TK_PASS 78
#define TK_COMPACT 79 #define TK_PRIVILEGE 79
#define TK_LP 80 #define TK_LOCAL 80
#define TK_RP 81 #define TK_COMPACT 81
#define TK_IF 82 #define TK_LP 82
#define TK_EXISTS 83 #define TK_RP 83
#define TK_PPS 84 #define TK_IF 84
#define TK_TSERIES 85 #define TK_EXISTS 85
#define TK_DBS 86 #define TK_AS 86
#define TK_STORAGE 87 #define TK_OUTPUTTYPE 87
#define TK_QTIME 88 #define TK_AGGREGATE 88
#define TK_CONNS 89 #define TK_BUFSIZE 89
#define TK_STATE 90 #define TK_PPS 90
#define TK_COMMA 91 #define TK_TSERIES 91
#define TK_KEEP 92 #define TK_DBS 92
#define TK_CACHE 93 #define TK_STORAGE 93
#define TK_REPLICA 94 #define TK_QTIME 94
#define TK_QUORUM 95 #define TK_CONNS 95
#define TK_DAYS 96 #define TK_STATE 96
#define TK_MINROWS 97 #define TK_COMMA 97
#define TK_MAXROWS 98 #define TK_KEEP 98
#define TK_BLOCKS 99 #define TK_CACHE 99
#define TK_CTIME 100 #define TK_REPLICA 100
#define TK_WAL 101 #define TK_QUORUM 101
#define TK_FSYNC 102 #define TK_DAYS 102
#define TK_COMP 103 #define TK_MINROWS 103
#define TK_PRECISION 104 #define TK_MAXROWS 104
#define TK_UPDATE 105 #define TK_BLOCKS 105
#define TK_CACHELAST 106 #define TK_CTIME 106
#define TK_PARTITIONS 107 #define TK_WAL 107
#define TK_UNSIGNED 108 #define TK_FSYNC 108
#define TK_TAGS 109 #define TK_COMP 109
#define TK_USING 110 #define TK_PRECISION 110
#define TK_AS 111 #define TK_UPDATE 111
#define TK_NULL 112 #define TK_CACHELAST 112
#define TK_NOW 113 #define TK_PARTITIONS 113
#define TK_SELECT 114 #define TK_UNSIGNED 114
#define TK_UNION 115 #define TK_TAGS 115
#define TK_ALL 116 #define TK_USING 116
#define TK_DISTINCT 117 #define TK_NULL 117
#define TK_FROM 118 #define TK_NOW 118
#define TK_VARIABLE 119 #define TK_SELECT 119
#define TK_INTERVAL 120 #define TK_UNION 120
#define TK_SESSION 121 #define TK_ALL 121
#define TK_STATE_WINDOW 122 #define TK_DISTINCT 122
#define TK_FILL 123 #define TK_FROM 123
#define TK_SLIDING 124 #define TK_VARIABLE 124
#define TK_ORDER 125 #define TK_INTERVAL 125
#define TK_BY 126 #define TK_SESSION 126
#define TK_ASC 127 #define TK_STATE_WINDOW 127
#define TK_DESC 128 #define TK_FILL 128
#define TK_GROUP 129 #define TK_SLIDING 129
#define TK_HAVING 130 #define TK_ORDER 130
#define TK_LIMIT 131 #define TK_BY 131
#define TK_OFFSET 132 #define TK_ASC 132
#define TK_SLIMIT 133 #define TK_DESC 133
#define TK_SOFFSET 134 #define TK_GROUP 134
#define TK_WHERE 135 #define TK_HAVING 135
#define TK_RESET 136 #define TK_LIMIT 136
#define TK_QUERY 137 #define TK_OFFSET 137
#define TK_SYNCDB 138 #define TK_SLIMIT 138
#define TK_ADD 139 #define TK_SOFFSET 139
#define TK_COLUMN 140 #define TK_WHERE 140
#define TK_MODIFY 141 #define TK_RESET 141
#define TK_TAG 142 #define TK_QUERY 142
#define TK_CHANGE 143 #define TK_SYNCDB 143
#define TK_SET 144 #define TK_ADD 144
#define TK_KILL 145 #define TK_COLUMN 145
#define TK_CONNECTION 146 #define TK_MODIFY 146
#define TK_STREAM 147 #define TK_TAG 147
#define TK_COLON 148 #define TK_CHANGE 148
#define TK_ABORT 149 #define TK_SET 149
#define TK_AFTER 150 #define TK_KILL 150
#define TK_ATTACH 151 #define TK_CONNECTION 151
#define TK_BEFORE 152 #define TK_STREAM 152
#define TK_BEGIN 153 #define TK_COLON 153
#define TK_CASCADE 154 #define TK_ABORT 154
#define TK_CLUSTER 155 #define TK_AFTER 155
#define TK_CONFLICT 156 #define TK_ATTACH 156
#define TK_COPY 157 #define TK_BEFORE 157
#define TK_DEFERRED 158 #define TK_BEGIN 158
#define TK_DELIMITERS 159 #define TK_CASCADE 159
#define TK_DETACH 160 #define TK_CLUSTER 160
#define TK_EACH 161 #define TK_CONFLICT 161
#define TK_END 162 #define TK_COPY 162
#define TK_EXPLAIN 163 #define TK_DEFERRED 163
#define TK_FAIL 164 #define TK_DELIMITERS 164
#define TK_FOR 165 #define TK_DETACH 165
#define TK_IGNORE 166 #define TK_EACH 166
#define TK_IMMEDIATE 167 #define TK_END 167
#define TK_INITIALLY 168 #define TK_EXPLAIN 168
#define TK_INSTEAD 169 #define TK_FAIL 169
#define TK_MATCH 170 #define TK_FOR 170
#define TK_KEY 171 #define TK_IGNORE 171
#define TK_OF 172 #define TK_IMMEDIATE 172
#define TK_RAISE 173 #define TK_INITIALLY 173
#define TK_REPLACE 174 #define TK_INSTEAD 174
#define TK_RESTRICT 175 #define TK_MATCH 175
#define TK_ROW 176 #define TK_KEY 176
#define TK_STATEMENT 177 #define TK_OF 177
#define TK_TRIGGER 178 #define TK_RAISE 178
#define TK_VIEW 179 #define TK_REPLACE 179
#define TK_SEMI 180 #define TK_RESTRICT 180
#define TK_NONE 181 #define TK_ROW 181
#define TK_PREV 182 #define TK_STATEMENT 182
#define TK_LINEAR 183 #define TK_TRIGGER 183
#define TK_IMPORT 184 #define TK_VIEW 184
#define TK_TBNAME 185 #define TK_SEMI 185
#define TK_JOIN 186 #define TK_NONE 186
#define TK_INSERT 187 #define TK_PREV 187
#define TK_INTO 188 #define TK_LINEAR 188
#define TK_VALUES 189 #define TK_IMPORT 189
#define TK_TBNAME 190
#define TK_JOIN 191
#define TK_INSERT 192
#define TK_INTO 193
#define TK_VALUES 194
#define TK_SPACE 300 #define TK_SPACE 300
......
...@@ -28,6 +28,10 @@ typedef struct tstr { ...@@ -28,6 +28,10 @@ typedef struct tstr {
#define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT) (_len)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT) (_len))
#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) #define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR))
#define varDataNetLen(v) (htons(((VarDataLenT *)(v))[0]))
#define varDataNetTLen(v) (sizeof(VarDataLenT) + varDataNetLen(v))
// this data type is internally used only in 'in' query to hold the values // this data type is internally used only in 'in' query to hold the values
#define TSDB_DATA_TYPE_ARRAY (1000) #define TSDB_DATA_TYPE_ARRAY (1000)
......
...@@ -49,7 +49,7 @@ typedef struct { ...@@ -49,7 +49,7 @@ typedef struct {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
SRspRet rspRet; SRspRet rspRet;
char reserveForSync[24]; char reserveForSync[24];
SWalHead pHead; SWalHead walHead;
} SVWriteMsg; } SVWriteMsg;
// vnodeStatus // vnodeStatus
......
...@@ -19,9 +19,9 @@ ELSE () ...@@ -19,9 +19,9 @@ ELSE ()
ENDIF () ENDIF ()
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(shell taos_static ${LINK_JEMALLOC}) TARGET_LINK_LIBRARIES(shell taos_static lua ${LINK_JEMALLOC})
ELSE () ELSE ()
TARGET_LINK_LIBRARIES(shell taos ${LINK_JEMALLOC}) TARGET_LINK_LIBRARIES(shell taos lua ${LINK_JEMALLOC})
ENDIF () ENDIF ()
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos) SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
......
...@@ -67,7 +67,7 @@ IF (TD_LINUX) ...@@ -67,7 +67,7 @@ IF (TD_LINUX)
ADD_EXECUTABLE(taosdemo ${SRC}) ADD_EXECUTABLE(taosdemo ${SRC})
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson ${LINK_JEMALLOC}) TARGET_LINK_LIBRARIES(taosdemo taos_static cJson lua ${LINK_JEMALLOC})
ELSE () ELSE ()
TARGET_LINK_LIBRARIES(taosdemo taos cJson ${LINK_JEMALLOC}) TARGET_LINK_LIBRARIES(taosdemo taos cJson ${LINK_JEMALLOC})
ENDIF () ENDIF ()
...@@ -76,9 +76,9 @@ ELSEIF (TD_WINDOWS) ...@@ -76,9 +76,9 @@ ELSEIF (TD_WINDOWS)
ADD_EXECUTABLE(taosdemo ${SRC}) ADD_EXECUTABLE(taosdemo ${SRC})
SET_SOURCE_FILES_PROPERTIES(./taosdemo.c PROPERTIES COMPILE_FLAGS -w) SET_SOURCE_FILES_PROPERTIES(./taosdemo.c PROPERTIES COMPILE_FLAGS -w)
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson) TARGET_LINK_LIBRARIES(taosdemo taos_static cJson lua)
ELSE () ELSE ()
TARGET_LINK_LIBRARIES(taosdemo taos cJson) TARGET_LINK_LIBRARIES(taosdemo taos cJson lua)
ENDIF () ENDIF ()
ELSEIF (TD_DARWIN) ELSEIF (TD_DARWIN)
# missing a few dependencies, such as <argp.h> # missing a few dependencies, such as <argp.h>
...@@ -86,9 +86,9 @@ ELSEIF (TD_DARWIN) ...@@ -86,9 +86,9 @@ ELSEIF (TD_DARWIN)
ADD_EXECUTABLE(taosdemo ${SRC}) ADD_EXECUTABLE(taosdemo ${SRC})
IF (TD_SOMODE_STATIC) IF (TD_SOMODE_STATIC)
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson) TARGET_LINK_LIBRARIES(taosdemo taos_static cJson lua)
ELSE () ELSE ()
TARGET_LINK_LIBRARIES(taosdemo taos cJson) TARGET_LINK_LIBRARIES(taosdemo taos cJson lua)
ENDIF () ENDIF ()
ENDIF () ENDIF ()
此差异已折叠。
...@@ -214,6 +214,23 @@ typedef struct SUserObj { ...@@ -214,6 +214,23 @@ typedef struct SUserObj {
struct SAcctObj * pAcct; struct SAcctObj * pAcct;
} SUserObj; } SUserObj;
typedef struct SFuncObj {
char name[TSDB_FUNC_NAME_LEN];
char path[128];
int32_t contLen;
char cont[TSDB_FUNC_CODE_LEN];
int32_t funcType;
int32_t bufSize;
int64_t createdTime;
uint8_t resType;
int16_t resBytes;
int64_t sig; // partial md5 sign
int16_t type; // [lua script|so|js]
int8_t reserved[64];
int8_t updateEnd[4];
int32_t refCount;
} SFuncObj;
typedef struct { typedef struct {
int64_t totalStorage; // Total storage wrtten from this account int64_t totalStorage; // Total storage wrtten from this account
int64_t compStorage; // Compressed storage on disk int64_t compStorage; // Compressed storage on disk
...@@ -258,7 +275,7 @@ typedef struct { ...@@ -258,7 +275,7 @@ typedef struct {
void * pIter; void * pIter;
void ** ppShow; void ** ppShow;
int16_t offset[TSDB_MAX_COLUMNS]; int16_t offset[TSDB_MAX_COLUMNS];
int16_t bytes[TSDB_MAX_COLUMNS]; int32_t bytes[TSDB_MAX_COLUMNS];
int32_t numOfReads; int32_t numOfReads;
int8_t maxReplica; int8_t maxReplica;
int8_t reserved0[1]; int8_t reserved0[1];
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_MNODE_FUNC_H
#define TDENGINE_MNODE_FUNC_H
#ifdef __cplusplus
extern "C" {
#endif
#include "mnodeDef.h"
int32_t mnodeInitFuncs();
void mnodeCleanupFuncs();
SFuncObj *mnodeGetFunc(char *name);
void * mnodeGetNextFunc(void *pIter, SFuncObj **pFunc);
void mnodeCancelGetNextFunc(void *pIter);
void mnodeIncFuncRef(SFuncObj *pFunc);
void mnodeDecFuncRef(SFuncObj *pFunc);
int32_t mnodeCreateFunc(SAcctObj *pAcct, char *name, int32_t codeLen, char *code, char *path, uint8_t outputType, int16_t outputLen, int32_t funcType, int32_t bufSize, SMnodeMsg *pMsg);
#ifdef __cplusplus
}
#endif
#endif
...@@ -33,7 +33,8 @@ typedef enum { ...@@ -33,7 +33,8 @@ typedef enum {
SDB_TABLE_VGROUP = 6, SDB_TABLE_VGROUP = 6,
SDB_TABLE_STABLE = 7, SDB_TABLE_STABLE = 7,
SDB_TABLE_CTABLE = 8, SDB_TABLE_CTABLE = 8,
SDB_TABLE_MAX = 9 SDB_TABLE_FUNC = 9,
SDB_TABLE_MAX = 10
} ESdbTable; } ESdbTable;
typedef enum { typedef enum {
......
...@@ -554,6 +554,9 @@ void mnodeCleanupDbs() { ...@@ -554,6 +554,9 @@ void mnodeCleanupDbs() {
tsDbSdb = NULL; tsDbSdb = NULL;
} }
static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { static int32_t mnodeGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t cols = 0; int32_t cols = 0;
......
此差异已折叠。
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "mnodeSdb.h" #include "mnodeSdb.h"
#include "mnodeVgroup.h" #include "mnodeVgroup.h"
#include "mnodeUser.h" #include "mnodeUser.h"
#include "mnodeFunc.h"
#include "mnodeTable.h" #include "mnodeTable.h"
#include "mnodeCluster.h" #include "mnodeCluster.h"
#include "mnodeShow.h" #include "mnodeShow.h"
...@@ -46,6 +47,7 @@ static SStep tsMnodeSteps[] = { ...@@ -46,6 +47,7 @@ static SStep tsMnodeSteps[] = {
{"cluster", mnodeInitCluster, mnodeCleanupCluster}, {"cluster", mnodeInitCluster, mnodeCleanupCluster},
{"accts", mnodeInitAccts, mnodeCleanupAccts}, {"accts", mnodeInitAccts, mnodeCleanupAccts},
{"users", mnodeInitUsers, mnodeCleanupUsers}, {"users", mnodeInitUsers, mnodeCleanupUsers},
{"funcs", mnodeInitFuncs, mnodeCleanupFuncs},
{"dnodes", mnodeInitDnodes, mnodeCleanupDnodes}, {"dnodes", mnodeInitDnodes, mnodeCleanupDnodes},
{"dbs", mnodeInitDbs, mnodeCleanupDbs}, {"dbs", mnodeInitDbs, mnodeCleanupDbs},
{"vgroups", mnodeInitVgroups, mnodeCleanupVgroups}, {"vgroups", mnodeInitVgroups, mnodeCleanupVgroups},
......
...@@ -42,6 +42,7 @@ ...@@ -42,6 +42,7 @@
#include "mnodeWrite.h" #include "mnodeWrite.h"
#include "mnodeRead.h" #include "mnodeRead.h"
#include "mnodePeer.h" #include "mnodePeer.h"
#include "mnodeFunc.h"
#define ALTER_CTABLE_RETRY_TIMES 3 #define ALTER_CTABLE_RETRY_TIMES 3
#define CREATE_CTABLE_RETRY_TIMES 10 #define CREATE_CTABLE_RETRY_TIMES 10
...@@ -104,6 +105,20 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) { ...@@ -104,6 +105,20 @@ static void mnodeDestroyChildTable(SCTableObj *pTable) {
tfree(pTable); tfree(pTable);
} }
static char* mnodeGetTableShowPattern(SShowObj *pShow) {
char* pattern = NULL;
if (pShow != NULL && pShow->payloadLen > 0) {
pattern = (char*)malloc(pShow->payloadLen + 1);
if (pattern == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
memcpy(pattern, pShow->payload, pShow->payloadLen);
pattern[pShow->payloadLen] = 0;
}
return pattern;
}
static int32_t mnodeChildTableActionDestroy(SSdbRow *pRow) { static int32_t mnodeChildTableActionDestroy(SSdbRow *pRow) {
mnodeDestroyChildTable(pRow->pObj); mnodeDestroyChildTable(pRow->pObj);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -1620,6 +1635,11 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1620,6 +1635,11 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER; SPatternCompareInfo info = PATTERN_COMPARE_INFO_INITIALIZER;
char stableName[TSDB_TABLE_NAME_LEN] = {0}; char stableName[TSDB_TABLE_NAME_LEN] = {0};
char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) {
return 0;
}
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextSuperTable(pShow->pIter, &pTable); pShow->pIter = mnodeGetNextSuperTable(pShow->pIter, &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
...@@ -1631,7 +1651,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1631,7 +1651,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
memset(stableName, 0, tListLen(stableName)); memset(stableName, 0, tListLen(stableName));
mnodeExtractTableName(pTable->info.tableId, stableName); mnodeExtractTableName(pTable->info.tableId, stableName);
if (pShow->payloadLen > 0 && patternMatch(pShow->payload, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) { if (pShow->payloadLen > 0 && patternMatch(pattern, stableName, sizeof(stableName) - 1, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
} }
...@@ -1671,6 +1691,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, ...@@ -1671,6 +1691,7 @@ int32_t mnodeRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows,
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
free(pattern);
return numOfRows; return numOfRows;
} }
...@@ -2892,7 +2913,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray ...@@ -2892,7 +2913,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray
(*totalMallocLen) *= 2; (*totalMallocLen) *= 2;
} }
pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen); pMultiMeta = realloc(pMultiMeta, *totalMallocLen);
if (pMultiMeta == NULL) { if (pMultiMeta == NULL) {
return NULL; return NULL;
} }
...@@ -2906,6 +2927,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -2906,6 +2927,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pInfo->numOfTables = htonl(pInfo->numOfTables); pInfo->numOfTables = htonl(pInfo->numOfTables);
pInfo->numOfVgroups = htonl(pInfo->numOfVgroups); pInfo->numOfVgroups = htonl(pInfo->numOfVgroups);
pInfo->numOfUdfs = htonl(pInfo->numOfUdfs);
int32_t contLen = pMsg->rpcMsg.contLen - sizeof(SMultiTableInfoMsg); int32_t contLen = pMsg->rpcMsg.contLen - sizeof(SMultiTableInfoMsg);
...@@ -2914,17 +2936,17 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -2914,17 +2936,17 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
char* str = strndup(pInfo->tableNames, contLen); char* str = strndup(pInfo->tableNames, contLen);
char** nameList = strsplit(str, ",", &num); char** nameList = strsplit(str, ",", &num);
SArray* pList = taosArrayInit(4, POINTER_BYTES); SArray* pList = taosArrayInit(4, POINTER_BYTES);
SMultiTableMeta *pMultiMeta = NULL;
if (num != pInfo->numOfTables + pInfo->numOfVgroups) { SMultiTableMeta *pMultiMeta = NULL;
if (num != pInfo->numOfTables + pInfo->numOfVgroups + pInfo->numOfUdfs) {
mError("msg:%p, app:%p, failed to get multi-tableMeta, msg inconsistent", pMsg, pMsg->rpcMsg.ahandle); mError("msg:%p, app:%p, failed to get multi-tableMeta, msg inconsistent", pMsg, pMsg->rpcMsg.ahandle);
code = TSDB_CODE_MND_INVALID_TABLE_NAME; code = TSDB_CODE_MND_INVALID_TABLE_NAME;
goto _end; goto _end;
} }
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size // first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16); int32_t totalMallocLen = sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
pMultiMeta = rpcMallocCont(totalMallocLen); pMultiMeta = calloc(1, totalMallocLen);
if (pMultiMeta == NULL) { if (pMultiMeta == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _end; goto _end;
...@@ -2957,7 +2979,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -2957,7 +2979,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
int remain = totalMallocLen - pMultiMeta->contLen; int remain = totalMallocLen - pMultiMeta->contLen;
if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) { if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) {
totalMallocLen *= 2; totalMallocLen *= 2;
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen); pMultiMeta = realloc(pMultiMeta, totalMallocLen);
if (pMultiMeta == NULL) { if (pMultiMeta == NULL) {
mnodeDecTableRef(pMsg->pTable); mnodeDecTableRef(pMsg->pTable);
code = TSDB_CODE_MND_OUT_OF_MEMORY; code = TSDB_CODE_MND_OUT_OF_MEMORY;
...@@ -2991,8 +3013,10 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -2991,8 +3013,10 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
} }
} }
int32_t tableNum = pInfo->numOfTables + pInfo->numOfVgroups;
// add the additional super table names that needs the vgroup info // add the additional super table names that needs the vgroup info
for(;t < num; ++t) { for(;t < tableNum; ++t) {
taosArrayPush(pList, &nameList[t]); taosArrayPush(pList, &nameList[t]);
} }
...@@ -3023,20 +3047,77 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) { ...@@ -3023,20 +3047,77 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta); pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta);
pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables); pMultiMeta->numOfTables = htonl(pMultiMeta->numOfTables);
// add the user-defined-function information
for(int32_t i = 0; i < pInfo->numOfUdfs; ++i, ++t) {
char buf[TSDB_FUNC_NAME_LEN] = {0};
strcpy(buf, nameList[t]);
SFuncObj* pFuncObj = mnodeGetFunc(buf);
if (pFuncObj == NULL) {
mError("function %s does not exist", buf);
code = TSDB_CODE_MND_INVALID_FUNC;
goto _end;
}
SFunctionInfoMsg* pFuncInfo = (SFunctionInfoMsg*) msg;
strcpy(pFuncInfo->name, buf);
pFuncInfo->len = htonl(pFuncObj->contLen);
memcpy(pFuncInfo->content, pFuncObj->cont, pFuncObj->contLen);
pFuncInfo->funcType = htonl(pFuncObj->funcType);
pFuncInfo->resType = pFuncObj->resType;
pFuncInfo->resBytes = htons(pFuncObj->resBytes);
pFuncInfo->bufSize = htonl(pFuncObj->bufSize);
msg += sizeof(SFunctionInfoMsg) + pFuncObj->contLen;
}
pMultiMeta->contLen = (int32_t) (msg - (char*) pMultiMeta);
pMultiMeta->numOfUdf = htonl(pInfo->numOfUdfs);
pMsg->rpcRsp.rsp = pMultiMeta; pMsg->rpcRsp.rsp = pMultiMeta;
pMsg->rpcRsp.len = pMultiMeta->contLen; pMsg->rpcRsp.len = pMultiMeta->contLen;
code = TSDB_CODE_SUCCESS; code = TSDB_CODE_SUCCESS;
char* tmp = rpcMallocCont(pMultiMeta->contLen + 2);
if (tmp == NULL) {
code = TSDB_CODE_MND_OUT_OF_MEMORY;
goto _end;
}
int32_t dataLen = (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta);
int32_t len = tsCompressString(pMultiMeta->meta, dataLen, 1, tmp + sizeof(SMultiTableMeta), (int32_t)dataLen + 2,
ONE_STAGE_COMP, NULL, 0);
pMultiMeta->metaClone = pInfo->metaClone;
pMultiMeta->rawLen = pMultiMeta->contLen;
if (len == -1 || len >= dataLen + 2) { // compress failed, do not compress this binary data
pMultiMeta->compressed = 0;
memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta) + pMultiMeta->contLen);
} else {
pMultiMeta->compressed = 1;
pMultiMeta->contLen = sizeof(SMultiTableMeta) + len;
// copy the header and the compressed payload
memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta));
}
pMsg->rpcRsp.rsp = tmp;
pMsg->rpcRsp.len = pMultiMeta->contLen;
SMultiTableMeta* p = (SMultiTableMeta*) tmp;
mDebug("multiTable info build completed, original:%d, compressed:%d, comp:%d", p->rawLen, p->contLen, p->compressed);
_end: _end:
tfree(str); tfree(str);
tfree(nameList); tfree(nameList);
taosArrayDestroy(pList); taosArrayDestroy(pList);
pMsg->pTable = NULL; pMsg->pTable = NULL;
pMsg->pVgroup = NULL; pMsg->pVgroup = NULL;
tfree(pMultiMeta);
if (code != TSDB_CODE_SUCCESS) {
rpcFreeCont(pMultiMeta);
}
return code; return code;
} }
...@@ -3132,16 +3213,10 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows ...@@ -3132,16 +3213,10 @@ static int32_t mnodeRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows
char prefix[64] = {0}; char prefix[64] = {0};
int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64); int32_t prefixLen = (int32_t)tableIdPrefix(pDb->name, prefix, 64);
char* pattern = NULL; char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0) { if (pShow->payloadLen > 0 && pattern == NULL) {
pattern = (char*)malloc(pShow->payloadLen + 1);
if (pattern == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return 0; return 0;
} }
memcpy(pattern, pShow->payload, pShow->payloadLen);
pattern[pShow->payloadLen] = 0;
}
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable); pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable);
...@@ -3372,6 +3447,11 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3372,6 +3447,11 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
strcat(prefix, TS_PATH_DELIMITER); strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix); int32_t prefixLen = (int32_t)strlen(prefix);
char* pattern = mnodeGetTableShowPattern(pShow);
if (pShow->payloadLen > 0 && pattern == NULL) {
return 0;
}
while (numOfRows < rows) { while (numOfRows < rows) {
pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable); pShow->pIter = mnodeGetNextChildTable(pShow->pIter, &pTable);
if (pTable == NULL) break; if (pTable == NULL) break;
...@@ -3387,7 +3467,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3387,7 +3467,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
// pattern compare for table name // pattern compare for table name
mnodeExtractTableName(pTable->info.tableId, tableName); mnodeExtractTableName(pTable->info.tableId, tableName);
if (pShow->payloadLen > 0 && patternMatch(pShow->payload, tableName, sizeof(tableName) - 1, &info) != TSDB_PATTERN_MATCH) { if (pShow->payloadLen > 0 && patternMatch(pattern, tableName, sizeof(tableName) - 1, &info) != TSDB_PATTERN_MATCH) {
mnodeDecTableRef(pTable); mnodeDecTableRef(pTable);
continue; continue;
} }
...@@ -3419,6 +3499,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro ...@@ -3419,6 +3499,7 @@ static int32_t mnodeRetrieveStreamTables(SShowObj *pShow, char *data, int32_t ro
mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow);
mnodeDecDbRef(pDb); mnodeDecDbRef(pDb);
free(pattern);
return numOfRows; return numOfRows;
} }
......
...@@ -37,6 +37,7 @@ extern "C" { ...@@ -37,6 +37,7 @@ extern "C" {
#include "osSysinfo.h" #include "osSysinfo.h"
#include "osTime.h" #include "osTime.h"
#include "osTimer.h" #include "osTimer.h"
#include "osSystem.h"
void osInit(); void osInit();
......
...@@ -77,6 +77,7 @@ extern "C" { ...@@ -77,6 +77,7 @@ extern "C" {
#include "osEok.h" #include "osEok.h"
#else #else
#include <argp.h> #include <argp.h>
#include <dlfcn.h>
#include <endian.h> #include <endian.h>
#include <linux/sysctl.h> #include <linux/sysctl.h>
#include <poll.h> #include <poll.h>
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_OS_SYSTEM_H
#define TDENGINE_OS_SYSTEM_H
#ifdef __cplusplus
extern "C" {
#endif
void* taosLoadDll(const char *filename);
void* taosLoadSym(void* handle, char* name);
void taosCloseDll(void *handle);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "tulog.h"
void* taosLoadDll(const char *filename) {
return NULL;
}
void* taosLoadSym(void* handle, char* name) {
return NULL;
}
void taosCloseDll(void *handle) {
}
...@@ -4,4 +4,4 @@ PROJECT(TDengine) ...@@ -4,4 +4,4 @@ PROJECT(TDengine)
AUX_SOURCE_DIRECTORY(. SRC) AUX_SOURCE_DIRECTORY(. SRC)
ADD_LIBRARY(oslinux ${SRC}) ADD_LIBRARY(oslinux ${SRC})
TARGET_LINK_LIBRARIES(oslinux m rt z) TARGET_LINK_LIBRARIES(oslinux m rt z dl)
\ No newline at end of file \ No newline at end of file
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tulog.h"
void* taosLoadDll(const char *filename) {
void *handle = dlopen (filename, RTLD_LAZY);
if (!handle) {
uError("load dll:%s failed, error:%s", filename, dlerror());
return NULL;
}
uDebug("dll %s loaded", filename);
return handle;
}
void* taosLoadSym(void* handle, char* name) {
void* sym = dlsym(handle, name);
char* error = NULL;
if ((error = dlerror()) != NULL) {
uWarn("load sym:%s failed, error:%s", name, dlerror());
return NULL;
}
uDebug("sym %s loaded", name)
return sym;
}
void taosCloseDll(void *handle) {
if (handle) {
dlclose(handle);
}
}
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册