提交 45b86131 编写于 作者: D dapan1121

Merge branch 'develop' into feature/TD-4399

......@@ -26,7 +26,7 @@
| TSDB_CODE_COM_OUT_OF_MEMORY | 0 | 0x0102 | "Out of memory" | -2147483390 |
| TSDB_CODE_COM_INVALID_CFG_MSG | 0 | 0x0103 | "Invalid config message" | -2147483389 |
| TSDB_CODE_COM_FILE_CORRUPTED | 0 | 0x0104 | "Data file corrupted" | -2147483388 |
| TSDB_CODE_TSC_INVALID_SQL | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_OPERATION | 0 | 0x0200 | "Invalid SQL statement" | -2147483136 |
| TSDB_CODE_TSC_INVALID_QHANDLE | 0 | 0x0201 | "Invalid qhandle" | -2147483135 |
| TSDB_CODE_TSC_INVALID_TIME_STAMP | 0 | 0x0202 | "Invalid combination of client/service time" | -2147483134 |
| TSDB_CODE_TSC_INVALID_VALUE | 0 | 0x0203 | "Invalid value in client" | -2147483133 |
......
......@@ -39,39 +39,29 @@ typedef struct SLocalDataSource {
} SLocalDataSource;
typedef struct SLocalMerger {
SLocalDataSource ** pLocalDataSrc;
SLocalDataSource **pLocalDataSrc;
int32_t numOfBuffer;
int32_t numOfCompleted;
int32_t numOfVnode;
SLoserTreeInfo * pLoserTree;
tFilePage * pResultBuf;
int32_t nResultBufSize;
tFilePage * pTempBuffer;
struct SQLFunctionCtx *pCtx;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor * pDesc;
SColumnModel * resColModel;
SColumnModel* finalModel;
tExtMemBuffer ** pExtMemBuffer; // disk-based buffer
bool orderPrjOnSTable; // projection query on stable
SLoserTreeInfo *pLoserTree;
int32_t rowSize; // size of each intermediate result.
tOrderDescriptor *pDesc;
tExtMemBuffer **pExtMemBuffer; // disk-based buffer
char *buf; // temp buffer
} SLocalMerger;
typedef struct SRetrieveSupport {
tExtMemBuffer ** pExtMemBuffer; // for build loser tree
tOrderDescriptor *pOrderDescriptor;
SColumnModel* pFinalColModel; // colModel for final result
SColumnModel* pFFColModel;
int32_t subqueryIndex; // index of current vnode in vnode list
SSqlObj * pParentSql;
tFilePage * localBuffer; // temp buffer, there is a buffer for each vnode to
uint32_t numOfRetry; // record the number of retry times
} SRetrieveSupport;
int32_t tscLocalReducerEnvCreate(SSqlObj *pSql, tExtMemBuffer ***pMemBuffer, tOrderDescriptor **pDesc,
SColumnModel **pFinalModel, SColumnModel** pFFModel, uint32_t nBufferSize);
int32_t tscLocalReducerEnvCreate(SQueryInfo* pQueryInfo, tExtMemBuffer ***pMemBuffer, int32_t numOfSub, tOrderDescriptor **pDesc, uint32_t nBufferSize, int64_t id);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, SColumnModel *pFinalModel, SColumnModel* pFFModel,
int32_t numOfVnodes);
void tscLocalReducerEnvDestroy(tExtMemBuffer **pMemBuffer, tOrderDescriptor *pDesc, int32_t numOfVnodes);
int32_t saveToBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tFilePage *pPage, void *data,
int32_t numOfRows, int32_t orderType);
......@@ -81,12 +71,10 @@ int32_t tscFlushTmpBuffer(tExtMemBuffer *pMemoryBuf, tOrderDescriptor *pDesc, tF
/*
* create local reducer to launch the second-stage reduce process at client site
*/
void tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SColumnModel *finalModel, SColumnModel *pFFModel, SSqlObj* pSql);
int32_t tscCreateLocalMerger(tExtMemBuffer **pMemBuffer, int32_t numOfBuffer, tOrderDescriptor *pDesc,
SQueryInfo *pQueryInfo, SLocalMerger **pMerger, int64_t id);
void tscDestroyLocalMerger(SSqlObj *pSql);
//int32_t tscDoLocalMerge(SSqlObj *pSql);
void tscDestroyLocalMerger(SLocalMerger* pLocalMerger);
#ifdef __cplusplus
}
......
......@@ -20,6 +20,7 @@
extern "C" {
#endif
#include "tsched.h"
#include "exception.h"
#include "os.h"
#include "qExtbuffer.h"
......@@ -36,6 +37,9 @@ extern "C" {
#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
(!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,
......@@ -59,7 +63,7 @@ typedef struct SJoinSupporter {
SArray* exprList;
SFieldInfo fieldsInfo;
STagCond tagCond;
SSqlGroupbyExpr groupInfo; // group by info
SGroupbyExpr groupInfo; // group by info
struct STSBuf* pTSBuf; // the TSBuf struct that holds the compressed timestamp array
FILE* f; // temporary file in order to create TSBuf
char path[PATH_MAX]; // temporary file path, todo dynamic allocate memory
......@@ -90,22 +94,14 @@ typedef struct SVgroupTableInfo {
SArray *itemList; // SArray<STableIdInfo>
} SVgroupTableInfo;
static FORCE_INLINE SQueryInfo* tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex) {
assert(pCmd != NULL && subClauseIndex >= 0);
if (pCmd->pQueryInfo == NULL || subClauseIndex >= pCmd->numOfClause) {
return NULL;
}
return pCmd->pQueryInfo[subClauseIndex];
}
SQueryInfo* tscGetActiveQueryInfo(SSqlCmd* pCmd);
int32_t converToStr(char *str, int type, void *buf, int32_t bufSize, int32_t *len);
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo);
void doRetrieveSubqueryData(SSchedMsg *pMsg);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
......@@ -127,7 +123,7 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
*/
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
bool hasTagValOutput(SQueryInfo* pQueryInfo);
......@@ -136,13 +132,14 @@ bool isStabledev(SQueryInfo* pQueryInfo);
bool isTsCompQuery(SQueryInfo* pQueryInfo);
bool isSimpleAggregate(SQueryInfo* pQueryInfo);
bool isBlockDistQuery(SQueryInfo* pQueryInfo);
int32_t tscGetTopbotQueryParam(SQueryInfo* pQueryInfo);
bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);
bool tscHasColumnFilter(SQueryInfo* pQueryInfo);
bool tscIsTwoStageSTableQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryTags(SQueryInfo* pQueryInfo);
......@@ -150,9 +147,9 @@ bool tscMultiRoundQuery(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscQueryBlockInfo(SQueryInfo* pQueryInfo);
SExprInfo* tscAddFuncInSelectClause(SQueryInfo* pQueryInfo, int32_t outputColIndex, int16_t functionId,
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType);
SColumnIndex* pIndex, SSchema* pColSchema, int16_t colType, int16_t colId);
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pzTableName, SSqlObj* pSql);
int32_t tscSetTableFullName(SName* pName, SStrToken* pzTableName, SSqlObj* pSql);
void tscClearInterpInfo(SQueryInfo* pQueryInfo);
bool tscIsInsertData(char* sqlstr);
......@@ -171,36 +168,49 @@ void tscFieldInfoUpdateOffset(SQueryInfo* pQueryInfo);
int16_t tscFieldInfoGetOffset(SQueryInfo* pQueryInfo, int32_t index);
void tscFieldInfoClear(SFieldInfo* pFieldInfo);
void tscFieldInfoCopy(SFieldInfo* pFieldInfo, const SFieldInfo* pSrc, const SArray* pExprList);
static FORCE_INLINE int32_t tscNumOfFields(SQueryInfo* pQueryInfo) { return pQueryInfo->fieldsInfo.numOfOutput; }
int32_t tscFieldInfoCompare(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2, int32_t *diffSize);
int32_t tscFieldInfoSetSize(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
int32_t tscFieldInfoSetSize(const SFieldInfo* pFieldInfo1, const SFieldInfo* pFieldInfo2);
void addExprParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
int32_t tscGetResRowLength(SArray* pExprList);
SExprInfo* tscSqlExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscExprInsert(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
SExprInfo* tscExprCreate(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, int32_t colType);
void tscExprAddParams(SSqlExpr* pExpr, char* argument, int32_t type, int32_t bytes);
SExprInfo* tscExprAppend(SQueryInfo* pQueryInfo, int16_t functionId, SColumnIndex* pColIndex, int16_t type,
int16_t size, int16_t resColId, int16_t interSize, bool isTagCol);
SExprInfo* tscSqlExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
SExprInfo* tscExprUpdate(SQueryInfo* pQueryInfo, int32_t index, int16_t functionId, int16_t srcColumnIndex, int16_t type,
int16_t size);
size_t tscSqlExprNumOfExprs(SQueryInfo* pQueryInfo);
void tscInsertPrimaryTsSourceColumn(SQueryInfo* pQueryInfo, uint64_t uid);
SExprInfo* tscSqlExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscSqlExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
void tscSqlExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscSqlExprInfoDestroy(SArray* pExprInfo);
size_t tscNumOfExprs(SQueryInfo* pQueryInfo);
SExprInfo *tscExprGet(SQueryInfo* pQueryInfo, int32_t index);
int32_t tscExprCopy(SArray* dst, const SArray* src, uint64_t uid, bool deepcopy);
int32_t tscExprCopyAll(SArray* dst, const SArray* src, bool deepcopy);
void tscExprAssign(SExprInfo* dst, const SExprInfo* src);
void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
SColumn* tscColumnClone(const SColumn* src);
bool tscColumnExists(SArray* pColumnList, int32_t columnIndex, uint64_t uid);
SColumn* tscColumnListInsert(SArray* pColumnList, int32_t columnIndex, uint64_t uid, SSchema* pSchema);
void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo);
......@@ -222,14 +232,14 @@ void tscGetSrcColumnInfo(SSrcColumnInfo* pColInfo, SQueryInfo* pQueryInfo);
bool tscShouldBeFreed(SSqlObj* pSql);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t subClauseIndex, int32_t tableIndex);
STableMetaInfo* tscGetTableMetaInfoFromCmd(SSqlCmd *pCmd, int32_t tableIndex);
STableMetaInfo* tscGetMetaInfo(SQueryInfo *pQueryInfo, int32_t tableIndex);
void tscInitQueryInfo(SQueryInfo* pQueryInfo);
void tscClearSubqueryInfo(SSqlCmd* pCmd);
int32_t tscAddQueryInfo(SSqlCmd *pCmd);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd, int32_t subClauseIndex);
SQueryInfo *tscGetQueryInfo(SSqlCmd* pCmd);
SQueryInfo *tscGetQueryInfoS(SSqlCmd *pCmd);
void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo);
......@@ -243,12 +253,11 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables);
void tscRemoveVgroupTableGroup(SArray* pVgroupTable, int32_t index);
void tscVgroupTableCopy(SVgroupTableInfo* info, SVgroupTableInfo* pInfo);
int tscGetSTableVgroupInfo(SSqlObj* pSql, int32_t clauseIndex);
int tscGetSTableVgroupInfo(SSqlObj* pSql, SQueryInfo* pQueryInfo);
int tscGetTableMeta(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo);
int tscGetTableMetaEx(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, bool createIfNotExists);
void tscResetForNextRetrieve(SSqlRes* pRes);
void tscDoQuery(SSqlObj* pSql);
void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
void doExecuteQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo);
......@@ -279,7 +288,7 @@ void registerSqlObj(SSqlObj* pSql);
SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, __async_cb_func_t fp, void* param, int32_t cmd, SSqlObj* pPrevSql);
void addGroupInfoForSubquery(SSqlObj* pParentObj, SSqlObj* pSql, int32_t subClauseIndex, int32_t tableIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex);
void doAddGroupColumnForSubquery(SQueryInfo* pQueryInfo, int32_t tagIndex, SSqlCmd* pCmd);
int16_t tscGetJoinTagColIdByUid(STagCond* pTagCond, uint64_t uid);
int16_t tscGetTagColIndexById(STableMeta* pTableMeta, int16_t colId);
......@@ -295,6 +304,11 @@ void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscAsyncQuerySingleRowForNextVnode(void *param, TAOS_RES *tres, int numOfRows);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
int tscSetMgmtEpSetFromCfg(const char *first, const char *second, SRpcCorEpSet *corEpSet);
int32_t getMultiTableMetaFromMnode(SSqlObj *pSql, SArray* pNameList, SArray* pVgroupNameList, __async_cb_func_t fp);
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray);
bool subAndCheckDone(SSqlObj *pSql, SSqlObj *pParentSql, int idx);
bool tscSetSqlOwner(SSqlObj* pSql);
void tscClearSqlOwner(SSqlObj* pSql);
......@@ -309,10 +323,12 @@ CChildTableMeta* tscCreateChildMeta(STableMeta* pTableMeta);
uint32_t tscGetTableMetaMaxSize();
int32_t tscCreateTableMetaFromSTableMeta(STableMeta* pChild, const char* name, void* buf);
STableMeta* tscTableMetaDup(STableMeta* pTableMeta);
SVgroupsInfo* tscVgroupsInfoDup(SVgroupsInfo* pVgroupsInfo);
int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr);
void tsCreateSQLFunctionCtx(SQueryInfo* pQueryInfo, SQLFunctionCtx* pCtx, SSchema* pSchema);
void* createQueryInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* createQInfoFromQueryNode(SQueryInfo* pQueryInfo, SExprInfo* pExprs, STableGroupInfo* pTableGroupInfo, SOperatorInfo* pOperator, char* sql, void* addr, int32_t stage);
void* malloc_throw(size_t size);
void* calloc_throw(size_t nmemb, size_t size);
......
......@@ -42,12 +42,6 @@ extern "C" {
struct SSqlInfo;
struct SLocalMerger;
// data source from sql string or from file
enum {
DATA_FROM_SQL_STRING = 1,
DATA_FROM_DATA_FILE = 2,
};
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct STableComInfo {
......@@ -85,10 +79,10 @@ typedef struct STableMeta {
} STableMeta;
typedef struct STableMetaInfo {
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
STableMeta *pTableMeta; // table meta, cached in client side and acquired by name
uint32_t tableMetaSize;
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
SVgroupsInfo *vgroupList;
SArray *pVgroupTables; // SArray<SVgroupTableInfo>
/*
* 1. keep the vgroup index during the multi-vnode super table projection query
......@@ -137,8 +131,8 @@ typedef struct SJoinNode {
} SJoinNode;
typedef struct SJoinInfo {
bool hasJoin;
SJoinNode* joinTables[TSDB_MAX_JOIN_TABLE_NUM];
bool hasJoin;
SJoinNode *joinTables[TSDB_MAX_JOIN_TABLE_NUM];
} SJoinInfo;
typedef struct STagCond {
......@@ -205,10 +199,11 @@ typedef struct SQueryInfo {
SInterval interval; // tumble time window
SSessionWindow sessionWindow; // session time window
SSqlGroupbyExpr groupbyExpr; // groupby tags info
SGroupbyExpr groupbyExpr; // groupby tags info
SArray * colList; // SArray<SColumn*>
SFieldInfo fieldsInfo;
SArray * exprList; // SArray<SExprInfo*>
SArray * exprList1; // final exprlist in case of arithmetic expression exists
SLimitVal limit;
SLimitVal slimit;
STagCond tagCond;
......@@ -232,30 +227,50 @@ typedef struct SQueryInfo {
int32_t bufLen;
char* buf;
SQInfo* pQInfo; // global merge operator
SArray* pDSOperator; // data source operator
SArray* pPhyOperator; // physical query execution plan
SQueryAttr* pQueryAttr; // query object
struct SQueryInfo *sibling; // sibling
SArray *pUpstream; // SArray<struct SQueryInfo>
struct SQueryInfo *pDownstream;
int32_t havingFieldNum;
bool stableQuery;
bool groupbyColumn;
bool simpleAgg;
bool arithmeticOnAgg;
bool projectionQuery;
bool hasFilter;
bool onlyTagQuery;
} SQueryInfo;
typedef struct {
STableMeta *pTableMeta;
SVgroupsInfo *pVgroupInfo;
} STableMetaVgroupInfo;
typedef struct SInsertStatementParam {
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables; // number of tables in table name list
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int8_t schemaAttached; // denote if submit block is built with table schema or not
STagData tagData; // NOTE: pTagData->data is used as a variant length array
char msg[512]; // error message
char *sql; // current sql statement position
uint32_t insertType; // insert data from [file|sql statement| bound statement]
} SInsertStatementParam;
// TODO extract sql parser supporter
typedef struct {
int command;
uint8_t msgType;
SInsertStatementParam insertParam;
char reserve1[3]; // fix bus error on arm32
bool autoCreated; // create table if it is not existed during retrieve table meta in mnode
union {
int32_t count;
int32_t numOfTablesInSubmit;
};
uint32_t insertType; // TODO remove it
char * curSql; // current sql, resume position of sql after parsing paused
int8_t parseFinished;
char reserve2[3]; // fix bus error on arm32
int16_t numOfCols;
......@@ -264,25 +279,13 @@ typedef struct {
char * payload;
int32_t payloadLen;
SQueryInfo **pQueryInfo;
int32_t numOfClause;
int32_t clauseIndex; // index of multiple subclause query
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
SHashObj *pTableMetaMap; // local buffer to keep the queried table meta, before validating the AST
SQueryInfo *pQueryInfo;
SQueryInfo *active; // current active query info
int32_t batchSize; // for parameter ('?') binding and batch processing
int32_t numOfParams;
int8_t dataSourceType; // load data from file or not
char reserve4[3]; // fix bus error on arm32
int8_t submitSchema; // submit block is built with table schema
char reserve5[3]; // fix bus error on arm32
STagData tagData; // NOTE: pTagData->data is used as a variant length array
SName **pTableNameList; // all involved tableMeta list of current insert sql statement.
int32_t numOfTables;
SHashObj *pTableBlockHashList; // data block for each table
SArray *pDataBlocks; // SArray<STableDataBlocks*>. Merged submit block for each vgroup
int32_t resColumnId;
} SSqlCmd;
typedef struct SResRec {
......@@ -443,7 +446,7 @@ int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void handleDownstreamOperator(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlRes* pOutput);
void destroyTableNameList(SSqlCmd* pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
......@@ -489,7 +492,7 @@ char *tscGetErrorMsgPayload(SSqlCmd *pCmd);
int32_t tscInvalidSQLErrMsg(char *msg, const char *additionalInfo, const char *sql);
int32_t tscSQLSyntaxErrMsg(char* msg, const char* additionalInfo, const char* sql);
int32_t tscToSQLCmd(SSqlObj *pSql, struct SSqlInfo *pInfo);
int32_t tscValidateSqlInfo(SSqlObj *pSql, struct SSqlInfo *pInfo);
extern int32_t sentinel;
extern SHashObj *tscVgroupMap;
......@@ -505,7 +508,7 @@ extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SQueryInfo* pQueryInfo);
int16_t getNewResColId(SSqlCmd* pCmd);
#ifdef __cplusplus
}
......
......@@ -218,11 +218,19 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_executeBatchImp(J
/*
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: executeBatchImp
* Method: closeStmt
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv *env, jobject jobj, jlong stmt, jlong con);
/**
* Class: com_taosdata_jdbc_TSDBJNIConnector
* Method: setTableNameTagsImp
* Signature: (JLjava/lang/String;I[B[B[B[BJ)I
*/
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp
(JNIEnv *, jobject, jlong, jstring, jint, jbyteArray, jbyteArray, jbyteArray, jbyteArray, jlong);
#ifdef __cplusplus
}
#endif
......
......@@ -749,7 +749,6 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setBindTableNameI
}
jniDebug("jobj:%p, conn:%p, set stmt bind table name:%s", jobj, tsconn, name);
(*env)->ReleaseStringUTFChars(env, jname, name);
return JNI_SUCCESS;
}
......@@ -762,7 +761,7 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
TAOS_STMT *pStmt = (TAOS_STMT *)stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt", jobj, tscon);
return JNI_SQL_NULL;
......@@ -777,14 +776,14 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
}
len = (*env)->GetArrayLength(env, lengthList);
char *lengthArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
char *lengthArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte *)lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
char *nullArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
char *nullArray = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte *)nullArray);
if ((*env)->ExceptionCheck(env)) {
}
......@@ -799,22 +798,10 @@ JNIEXPORT jlong JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_bindColDataImp(J
b->length = (int32_t*)lengthArray;
// set the length and is_null array
switch(dataType) {
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT: {
int32_t bytes = tDataTypes[dataType].bytes;
for(int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
break;
}
case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_BINARY: {
// do nothing
if (!IS_VAR_DATA_TYPE(dataType)) {
int32_t bytes = tDataTypes[dataType].bytes;
for (int32_t i = 0; i < numOfRows; ++i) {
b->length[i] = bytes;
}
}
......@@ -878,3 +865,74 @@ JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_closeStmt(JNIEnv
jniDebug("jobj:%p, conn:%p, stmt closed", jobj, tscon);
return JNI_SUCCESS;
}
JNIEXPORT jint JNICALL Java_com_taosdata_jdbc_TSDBJNIConnector_setTableNameTagsImp(JNIEnv *env, jobject jobj,
jlong stmt, jstring tableName, jint numOfTags, jbyteArray tags, jbyteArray typeList, jbyteArray lengthList, jbyteArray nullList, jlong conn) {
TAOS *tsconn = (TAOS *)conn;
if (tsconn == NULL) {
jniError("jobj:%p, connection already closed", jobj);
return JNI_CONNECTION_NULL;
}
TAOS_STMT* pStmt = (TAOS_STMT*) stmt;
if (pStmt == NULL) {
jniError("jobj:%p, conn:%p, invalid stmt handle", jobj, tsconn);
return JNI_SQL_NULL;
}
jsize len = (*env)->GetArrayLength(env, tags);
char *tagsData = (char *)calloc(1, len);
(*env)->GetByteArrayRegion(env, tags, 0, len, (jbyte *)tagsData);
if ((*env)->ExceptionCheck(env)) {
// todo handle error
}
len = (*env)->GetArrayLength(env, lengthList);
int64_t *lengthArray = (int64_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, lengthList, 0, len, (jbyte*) lengthArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, typeList);
char *typeArray = (char*) calloc(1, len);
(*env)->GetByteArrayRegion(env, typeList, 0, len, (jbyte*) typeArray);
if ((*env)->ExceptionCheck(env)) {
}
len = (*env)->GetArrayLength(env, nullList);
int32_t *nullArray = (int32_t*) calloc(1, len);
(*env)->GetByteArrayRegion(env, nullList, 0, len, (jbyte*) nullArray);
if ((*env)->ExceptionCheck(env)) {
}
const char *name = (*env)->GetStringUTFChars(env, tableName, NULL);
char* curTags = tagsData;
TAOS_BIND *tagsBind = calloc(numOfTags, sizeof(TAOS_BIND));
for(int32_t i = 0; i < numOfTags; ++i) {
tagsBind[i].buffer_type = typeArray[i];
tagsBind[i].buffer = curTags;
tagsBind[i].is_null = &nullArray[i];
tagsBind[i].length = (uintptr_t*) &lengthArray[i];
curTags += lengthArray[i];
}
int32_t code = taos_stmt_set_tbname_tags((void*)stmt, name, tagsBind);
int32_t nTags = (int32_t) numOfTags;
jniDebug("jobj:%p, conn:%p, set table name:%s, numOfTags:%d", jobj, tsconn, name, nTags);
tfree(tagsData);
tfree(lengthArray);
tfree(typeArray);
tfree(nullArray);
(*env)->ReleaseStringUTFChars(env, tableName, name);
if (code != TSDB_CODE_SUCCESS) {
jniError("jobj:%p, conn:%p, code:%s", jobj, tsconn, tstrerror(code));
return JNI_TDENGINE_ERROR;
}
return JNI_SUCCESS;
}
......@@ -59,6 +59,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
pCmd->curSql = pSql->sqlstr;
pCmd->resColumnId = TSDB_RES_COL_ID;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) return;
......@@ -69,7 +70,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, __async_cb_func_t fp, void* para
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo);
}
......@@ -127,7 +128,8 @@ static void tscAsyncFetchRowsProxy(void *param, TAOS_RES *tres, int numOfRows) {
* all available virtual node has been checked already, now we need to check
* for the next subclause queries
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling;
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
......@@ -220,6 +222,17 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
tscResetForNextRetrieve(pRes);
// handle the sub queries of join query
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (pQueryInfo->pUpstream != NULL && taosArrayGetSize(pQueryInfo->pUpstream) > 0) {
SSchedMsg schedMsg = {0};
schedMsg.fp = doRetrieveSubqueryData;
schedMsg.ahandle = (void *)pSql;
schedMsg.thandle = (void *)1;
schedMsg.msg = 0;
taosScheduleTask(tscQhandle, &schedMsg);
return;
}
if (pCmd->command == TSDB_SQL_TABLE_JOIN_RETRIEVE) {
tscFetchDatablockForSubquery(pSql);
} else if (pRes->completed) {
......@@ -231,7 +244,8 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
* all available virtual nodes in current clause has been checked already, now try the
* next one in the following union subclause
*/
if (pCmd->clauseIndex < pCmd->numOfClause - 1) {
if (pCmd->active->sibling != NULL) {
pCmd->active = pCmd->active->sibling; // todo refactor
tscTryQueryNextClause(pSql, tscAsyncQueryRowsForNextVnode);
return;
}
......@@ -255,7 +269,7 @@ void taos_fetch_rows_a(TAOS_RES *tres, __async_cb_func_t fp, void *param) {
pCmd->command = (pCmd->command > TSDB_SQL_MGMT) ? TSDB_SQL_RETRIEVE : TSDB_SQL_FETCH;
}
SQueryInfo* pQueryInfo1 = tscGetActiveQueryInfo(&pSql->cmd);
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(&pSql->cmd);
tscBuildAndSendRequest(pSql, pQueryInfo1);
}
}
......@@ -317,26 +331,38 @@ static int32_t updateMetaBeforeRetryQuery(SSqlObj* pSql, STableMetaInfo* pTableM
// update the pExpr info, colList info, number of table columns
// TODO Re-parse this sql and issue the corresponding subquery as an alternative for this case.
if (pSql->retryReason == TSDB_CODE_TDB_INVALID_TABLE_ID) {
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
int32_t numOfCols = tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
int32_t numOfTags = tscGetNumOfTags(pTableMetaInfo->pTableMeta);
SSchema *pSchema = tscGetTableSchema(pTableMetaInfo->pTableMeta);
SSchema *pTagSchema = tscGetTableTagSchema(pTableMetaInfo->pTableMeta);
for (int32_t i = 0; i < numOfExprs; ++i) {
SSqlExpr *pExpr = &(tscSqlExprGet(pQueryInfo, i)->base);
SSqlExpr *pExpr = &(tscExprGet(pQueryInfo, i)->base);
// update the table uid
pExpr->uid = pTableMetaInfo->pTableMeta->id.uid;
if (pExpr->colInfo.colIndex >= 0) {
int32_t index = pExpr->colInfo.colIndex;
if ((TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag) && index >= numOfCols) ||
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < numOfCols || index >= (numOfCols + numOfTags)))) {
(TSDB_COL_IS_TAG(pExpr->colInfo.flag) && (index < 0 || index >= numOfTags))) {
return pSql->retryReason;
}
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
if ((pTagSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pTagSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else if (TSDB_COL_IS_NORMAL_COL(pExpr->colInfo.flag)) {
if ((pSchema[pExpr->colInfo.colIndex].colId != pExpr->colInfo.colId) &&
strcasecmp(pExpr->colInfo.name, pSchema[pExpr->colInfo.colIndex].name) != 0) {
return pSql->retryReason;
}
} else { // do nothing for udc
}
}
}
......@@ -374,12 +400,12 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug("0x%"PRIx64" get %s successfully", pSql->self, msg);
if (pSql->pStream == NULL) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
// check if it is a sub-query of super table query first, if true, enter another routine
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY|TSDB_QUERY_TYPE_SUBQUERY|TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send the corresponding query", pSql->self);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, (TSDB_QUERY_TYPE_STABLE_SUBQUERY | TSDB_QUERY_TYPE_SUBQUERY |
TSDB_QUERY_TYPE_TAG_FILTER_QUERY))) {
tscDebug("0x%" PRIx64 " update cached table-meta, continue to process sql and send the corresponding query", pSql->self);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
......@@ -401,42 +427,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else { // continue to process normal async query
if (pCmd->parseFinished) {
tscDebug("0x%"PRIx64" update local table meta, continue to process sql and send corresponding query", pSql->self);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
assert(code == TSDB_CODE_TSC_ACTION_IN_PROGRESS || code == TSDB_CODE_SUCCESS);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
}
assert(pCmd->command != TSDB_SQL_INSERT);
if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64" redo parse sql string and proceed", pSql->self);
pCmd->parseFinished = false;
tscResetSqlCmd(pCmd, true);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
tscBuildAndSendRequest(pSql, NULL);
} else { // in all other cases, simple retry
tscBuildAndSendRequest(pSql, NULL);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
} else {
tscDebug("0x%"PRIx64" continue parse sql after get table meta", pSql->self);
if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
code = tsParseSql(pSql, false);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -446,8 +438,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
goto _error;
}
if (pCmd->insertType == TSDB_QUERY_TYPE_STMT_INSERT) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT)) {
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
......@@ -457,59 +449,52 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
}
(*pSql->fp)(pSql->param, pSql, code);
} else if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
} else {
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
tscImportDataFromFile(pSql);
} else {
tscHandleMultivnodeInsert(pSql);
}
}
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again",
pSql->self);
tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
SQueryInfo* pQueryInfo1 = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
executeQuery(pSql, pQueryInfo1);
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
}
taosReleaseRef(tscObjRef, pSql->self);
return;
}
}
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
} else { // stream computing
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo1 = tscGetQueryInfo(pCmd);
executeQuery(pSql, pQueryInfo1);
}
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, pCmd->clauseIndex);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
taosReleaseRef(tscObjRef, pSql->self);
return;
} else if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
}
} else { // stream computing
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pCmd->command);
tscDebug("0x%"PRIx64" stream:%p meta is updated, start new query, command:%d", pSql->self, pSql->pStream, pSql->cmd.command);
if (!pSql->cmd.parseFinished) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (tscNumOfExprs(pQueryInfo) == 0) {
tsParseSql(pSql, false);
}
(*pSql->fp)(pSql->param, pSql, code);
taosReleaseRef(tscObjRef, pSql->self);
return;
}
// tscDoQuery(pSql);
taosReleaseRef(tscObjRef, pSql->self);
return;
_error:
......
......@@ -53,7 +53,7 @@ static int32_t tscSetValueToResObj(SSqlObj *pSql, int32_t rowLen) {
SSqlRes *pRes = &pSql->res;
// one column for each row
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -154,14 +154,14 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
pSql->cmd.numOfCols = numOfCols;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f = {.type = TSDB_DATA_TYPE_BINARY, .bytes = (TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE};
tstrncpy(f.name, "Field", sizeof(f.name));
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE, -1000, (TSDB_COL_NAME_LEN - 1), false);
rowLen += ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE);
......@@ -171,7 +171,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Type", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(typeColLength + VARSTR_HEADER_SIZE),
-1000, typeColLength, false);
rowLen += typeColLength + VARSTR_HEADER_SIZE;
......@@ -181,7 +181,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Length", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_INT, sizeof(int32_t),
-1000, sizeof(int32_t), false);
rowLen += sizeof(int32_t);
......@@ -191,7 +191,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
tstrncpy(f.name, "Note", sizeof(f.name));
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, (int16_t)(noteColLength + VARSTR_HEADER_SIZE),
-1000, noteColLength, false);
rowLen += noteColLength + VARSTR_HEADER_SIZE;
......@@ -199,7 +199,7 @@ static int32_t tscBuildTableSchemaResultFields(SSqlObj *pSql, int32_t numOfCols,
}
static int32_t tscProcessDescribeTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
assert(tscGetMetaInfo(pQueryInfo, 0)->pTableMeta != NULL);
......@@ -390,7 +390,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
SColumnIndex index = {0};
pSql->cmd.numOfCols = 2;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
TAOS_FIELD f;
......@@ -405,7 +405,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
SInternalField* pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY, f.bytes, -1000, f.bytes - VARSTR_HEADER_SIZE, false);
rowLen += f.bytes;
......@@ -418,7 +418,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
}
pInfo = tscFieldInfoAppend(&pQueryInfo->fieldsInfo, &f);
pInfo->pExpr = tscSqlExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
pInfo->pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TS_DUMMY, &index, TSDB_DATA_TYPE_BINARY,
(int16_t)(ddlLen + VARSTR_HEADER_SIZE), -1000, ddlLen, false);
rowLen += ddlLen + VARSTR_HEADER_SIZE;
......@@ -428,7 +428,7 @@ static int32_t tscSCreateBuildResultFields(SSqlObj *pSql, BuildType type, const
static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const char *tableName, const char *ddl) {
SSqlRes *pRes = &pSql->res;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t numOfRows = 1;
if (strlen(ddl) == 0) {
......@@ -445,7 +445,7 @@ static int32_t tscSCreateSetValueToResObj(SSqlObj *pSql, int32_t rowLen, const c
return 0;
}
static int32_t tscSCreateBuildResult(SSqlObj *pSql, BuildType type, const char *str, const char *result) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
int32_t rowLen = tscSCreateBuildResultFields(pSql, type, result);
tscFieldInfoUpdateOffset(pQueryInfo);
......@@ -532,7 +532,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
}
buf[0] = 0;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0, 0)->pTableMeta;
STableMeta *pMeta = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0)->pTableMeta;
if (pMeta->tableType == TSDB_SUPER_TABLE || pMeta->tableType == TSDB_NORMAL_TABLE ||
pMeta->tableType == TSDB_STREAM_TABLE) {
free(buf);
......@@ -553,7 +553,7 @@ static int32_t tscGetTableTagColumnName(SSqlObj *pSql, char **result) {
return TSDB_CODE_SUCCESS;
}
static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -607,7 +607,7 @@ static int32_t tscRebuildDDLForSubTable(SSqlObj *pSql, const char *tableName, ch
}
static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName, char *ddl) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -634,7 +634,7 @@ static int32_t tscRebuildDDLForNormalTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName, char *ddl) {
char *result = ddl;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableMeta * pMeta = pTableMetaInfo->pTableMeta;
......@@ -675,7 +675,7 @@ static int32_t tscRebuildDDLForSuperTable(SSqlObj *pSql, const char *tableName,
}
static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
assert(pTableMetaInfo->pTableMeta != NULL);
......@@ -704,7 +704,7 @@ static int32_t tscProcessShowCreateTable(SSqlObj *pSql) {
}
static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
......@@ -730,7 +730,7 @@ static int32_t tscProcessShowCreateDatabase(SSqlObj *pSql) {
return TSDB_CODE_TSC_ACTION_IN_PROGRESS;
}
static int32_t tscProcessCurrentUser(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resBytes = TSDB_USER_LEN + TSDB_DATA_TYPE_BINARY;
......@@ -757,7 +757,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
extractDBName(pSql->pTscObj->db, db);
pthread_mutex_unlock(&pSql->pTscObj->mutex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -784,7 +784,7 @@ static int32_t tscProcessCurrentDB(SSqlObj *pSql) {
static int32_t tscProcessServerVer(SSqlObj *pSql) {
const char* v = pSql->pTscObj->sversion;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, pSql->cmd.clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -807,7 +807,7 @@ static int32_t tscProcessServerVer(SSqlObj *pSql) {
}
static int32_t tscProcessClientVer(SSqlObj *pSql) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
pExpr->resType = TSDB_DATA_TYPE_BINARY;
......@@ -859,7 +859,7 @@ static int32_t tscProcessServStatus(SSqlObj *pSql) {
return pSql->res.code;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, 0);
int32_t val = 1;
......@@ -873,7 +873,7 @@ void tscSetLocalQueryResult(SSqlObj *pSql, const char *val, const char *columnNa
pCmd->numOfCols = 1;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
pQueryInfo->order.order = TSDB_ORDER_ASC;
tscFieldInfoClear(&pQueryInfo->fieldsInfo);
......@@ -928,7 +928,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
} else if (pCmd->command == TSDB_SQL_SERV_STATUS) {
pRes->code = tscProcessServStatus(pSql);
} else {
pRes->code = TSDB_CODE_TSC_INVALID_SQL;
pRes->code = TSDB_CODE_TSC_INVALID_OPERATION;
tscError("0x%"PRIx64" not support command:%d", pSql->self, pCmd->command);
}
......
此差异已折叠。
......@@ -107,7 +107,7 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
}
if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (timePrec == TSDB_TIME_PRECISION_MILLI) {
......@@ -441,7 +441,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, SSqlCmd *pCmd, int1
*str += index;
if (sToken.type == TK_QUESTION) {
if (pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
if (pCmd->insertParam.insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
return tscSQLSyntaxErrMsg(pCmd->payload, "? only allowed in binding insertion", *str);
}
......@@ -647,7 +647,7 @@ static int32_t tsSetBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta,
pBlocks->sversion = pTableMeta->sversion;
if (pBlocks->numOfRows + numOfRows >= INT16_MAX) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
pBlocks->numOfRows += numOfRows;
return TSDB_CODE_SUCCESS;
......@@ -708,7 +708,7 @@ static int32_t doParseInsertStatement(SSqlCmd* pCmd, char **str, STableDataBlock
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
code = TSDB_CODE_TSC_INVALID_SQL;
code = TSDB_CODE_TSC_INVALID_OPERATION;
char tmpTokenBuf[16*1024] = {0}; // used for deleting Escape character: \\, \', \"
int32_t numOfRows = 0;
......@@ -747,12 +747,10 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
const int32_t STABLE_INDEX = 1;
SSqlCmd * pCmd = &pSql->cmd;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
char *sql = *sqlstr;
pSql->cmd.autoCreated = false;
// get the token of specified table
index = 0;
tableToken = tStrGetToken(sql, &index, false);
......@@ -786,7 +784,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
}
if (numOfColList == 0 && (*boundColumn) != NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, TABLE_INDEX);
......@@ -802,7 +800,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
}
STableMetaInfo *pSTableMetaInfo = tscGetMetaInfo(pQueryInfo, STABLE_INDEX);
code = tscSetTableFullName(pSTableMetaInfo, &sToken, pSql);
code = tscSetTableFullName(&pSTableMetaInfo->name, &sToken, pSql);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -879,7 +877,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
if (TK_ILLEGAL == sToken.type) {
tdDestroyKVRowBuilder(&kvRowBuilder);
tscDestroyBoundColumnInfo(&spd);
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (sToken.n == 0 || sToken.type == TK_RP) {
......@@ -961,7 +959,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
}
if (numOfColsAfterTags == 0 && (*boundColumn) != NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
sToken = tStrGetToken(sql, &index, false);
......@@ -973,13 +971,13 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
return tscInvalidSQLErrMsg(pCmd->payload, "invalid table name", *sqlstr);
}
int32_t ret = tscSetTableFullName(pTableMetaInfo, &tableToken, pSql);
int32_t ret = tscSetTableFullName(&pTableMetaInfo->name, &tableToken, pSql);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, true);
......@@ -991,7 +989,7 @@ static int32_t tscCheckIfCreateTable(char **sqlstr, SSqlObj *pSql, char** boundC
sql = sToken.z;
if (sql == NULL) {
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
code = tscGetTableMetaEx(pSql, pTableMetaInfo, false);
......@@ -1015,12 +1013,17 @@ int validateTableName(char *tblName, int len, SStrToken* psTblToken) {
return tscValidateName(psTblToken);
}
static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
if (pCmd->dataSourceType != 0 && pCmd->dataSourceType != type) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mix up", sql);
static int32_t validateDataSource(SSqlCmd *pCmd, int32_t type, const char *sql) {
uint32_t *insertType = &pCmd->insertParam.insertType;
if (*insertType == TSDB_QUERY_TYPE_STMT_INSERT && type == TSDB_QUERY_TYPE_INSERT) {
return TSDB_CODE_SUCCESS;
}
if ((*insertType) != 0 && (*insertType) != type) {
return tscInvalidSQLErrMsg(pCmd->payload, "keyword VALUES and FILE are not allowed to mixed up", sql);
}
pCmd->dataSourceType = type;
*insertType = type;
return TSDB_CODE_SUCCESS;
}
......@@ -1090,7 +1093,6 @@ static int32_t parseBoundColumns(SSqlCmd* pCmd, SParsedDataColInfo* pColInfo, SS
_clean:
pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code;
}
......@@ -1106,7 +1108,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
int32_t totalNum = 0;
int32_t code = TSDB_CODE_SUCCESS;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
assert(pQueryInfo != NULL);
STableMetaInfo *pTableMetaInfo = (pQueryInfo->numOfTables == 0)? tscAddEmptyMetaInfo(pQueryInfo):tscGetMetaInfo(pQueryInfo, 0);
......@@ -1120,9 +1122,9 @@ int tsParseInsertSql(SSqlObj *pSql) {
return code;
}
if (NULL == pCmd->pTableBlockHashList) {
pCmd->pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (NULL == pCmd->pTableBlockHashList) {
if (NULL == pCmd->insertParam.pTableBlockHashList) {
pCmd->insertParam.pTableBlockHashList = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (NULL == pCmd->insertParam.pTableBlockHashList) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _clean;
}
......@@ -1130,7 +1132,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
str = pCmd->curSql;
}
tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->pTableBlockHashList);
tscDebug("0x%"PRIx64" create data block list hashList:%p", pSql->self, pCmd->insertParam.pTableBlockHashList);
while (1) {
int32_t index = 0;
......@@ -1142,7 +1144,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
* if the data is from the data file, no data has been generated yet. So, there no data to
* merge or submit, save the file path and parse the file in other routines.
*/
if (pCmd->dataSourceType == DATA_FROM_DATA_FILE) {
if (TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT)) {
goto _clean;
}
......@@ -1151,7 +1153,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
* Otherwise, create the first submit block and submit to virtual node.
*/
if (totalNum == 0) {
code = TSDB_CODE_TSC_INVALID_SQL;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto _clean;
} else {
break;
......@@ -1168,7 +1170,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean;
}
if ((code = tscSetTableFullName(pTableMetaInfo, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
if ((code = tscSetTableFullName(&pTableMetaInfo->name, &sTblToken, pSql)) != TSDB_CODE_SUCCESS) {
goto _clean;
}
......@@ -1203,7 +1205,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
if (sToken.type == TK_FILE) {
if (validateDataSource(pCmd, DATA_FROM_DATA_FILE, sToken.z) != TSDB_CODE_SUCCESS) {
if (validateDataSource(pCmd, TSDB_QUERY_TYPE_FILE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean;
}
......@@ -1236,12 +1238,12 @@ int tsParseInsertSql(SSqlObj *pSql) {
if (bindedColumns == NULL) {
STableMeta *pTableMeta = pTableMetaInfo->pTableMeta;
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
if (validateDataSource(pCmd, TSDB_QUERY_TYPE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean;
}
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
int32_t ret = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
&dataBuf, NULL);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -1254,14 +1256,14 @@ int tsParseInsertSql(SSqlObj *pSql) {
}
} else { // bindedColumns != NULL
// insert into tablename(col1, col2,..., coln) values(v1, v2,... vn);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta;
if (validateDataSource(pCmd, DATA_FROM_SQL_STRING, sToken.z) != TSDB_CODE_SUCCESS) {
if (validateDataSource(pCmd, TSDB_QUERY_TYPE_INSERT, sToken.z) != TSDB_CODE_SUCCESS) {
goto _clean;
}
STableDataBlocks *dataBuf = NULL;
int32_t ret = tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
int32_t ret = tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_DEFAULT_PAYLOAD_SIZE,
sizeof(SSubmitBlk), tinfo.rowSize, &pTableMetaInfo->name, pTableMeta,
&dataBuf, NULL);
if (ret != TSDB_CODE_SUCCESS) {
......@@ -1297,7 +1299,8 @@ int tsParseInsertSql(SSqlObj *pSql) {
goto _clean;
}
if ((pCmd->insertType != TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->pTableBlockHashList) > 0) { // merge according to vgId
// merge according to vgId
if (!TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_STMT_INSERT) && taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
if ((code = tscMergeTableDataBlocks(pSql, true)) != TSDB_CODE_SUCCESS) {
goto _clean;
}
......@@ -1308,7 +1311,6 @@ int tsParseInsertSql(SSqlObj *pSql) {
_clean:
pCmd->curSql = NULL;
pCmd->parseFinished = 1;
return code;
}
......@@ -1326,9 +1328,8 @@ int tsInsertInitialCheck(SSqlObj *pSql) {
pCmd->count = 0;
pCmd->command = TSDB_SQL_INSERT;
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT | pCmd->insertType);
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd);
TSDB_QUERY_SET_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT);
sToken = tStrGetToken(pSql->sqlstr, &index, false);
if (sToken.type != TK_INTO) {
......@@ -1343,11 +1344,11 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
int32_t ret = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
if ((!pCmd->parseFinished) && (!initial)) {
if (!initial) {
tscDebug("0x%"PRIx64" resume to parse sql: %s", pSql->self, pCmd->curSql);
}
ret = tscAllocPayload(&pSql->cmd, TSDB_DEFAULT_PAYLOAD_SIZE);
ret = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (TSDB_CODE_SUCCESS != ret) {
return ret;
}
......@@ -1357,31 +1358,32 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
return ret;
}
// make a backup as tsParseInsertSql may modify the string
char* sqlstr = strdup(pSql->sqlstr);
ret = tsParseInsertSql(pSql);
if ((sqlstr == NULL) || (pSql->parseRetry >= 1) ||
(ret != TSDB_CODE_TSC_SQL_SYNTAX_ERROR && ret != TSDB_CODE_TSC_INVALID_SQL)) {
free(sqlstr);
} else {
assert(ret == TSDB_CODE_SUCCESS || ret == TSDB_CODE_TSC_ACTION_IN_PROGRESS || ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION);
if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) {
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->parseRetry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
ret = tsParseInsertSql(pSql);
}
}
} else {
SSqlInfo SQLInfo = qSqlParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
SSqlInfo sqlInfo = qSqlParse(pSql->sqlstr);
ret = tscValidateSqlInfo(pSql, &sqlInfo);
if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
tscResetSqlCmd(pCmd, true);
pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
ret = tscValidateSqlInfo(pSql, &sqlInfo);
}
SqlInfoDestroy(&SQLInfo);
SqlInfoDestroy(&sqlInfo);
}
/*
......@@ -1398,8 +1400,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
SSqlCmd *pCmd = &pSql->cmd;
pSql->res.numOfRows = 0;
assert(pCmd->numOfClause == 1);
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0)->pTableMeta;
STableMeta *pTableMeta = tscGetTableMetaInfoFromCmd(pCmd, 0)->pTableMeta;
SSubmitBlk *pBlocks = (SSubmitBlk *)(pTableDataBlocks->pData);
code = tsSetBlockInfo(pBlocks, pTableMeta, numOfRows);
......@@ -1411,7 +1412,7 @@ static int doPackSendDataBlock(SSqlObj *pSql, int32_t numOfRows, STableDataBlock
return code;
}
STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->pDataBlocks, 0);
STableDataBlocks *pDataBlock = taosArrayGetP(pCmd->insertParam.pDataBlocks, 0);
if ((code = tscCopyDataBlockToPayload(pSql, pDataBlock)) != TSDB_CODE_SUCCESS) {
return code;
}
......@@ -1461,17 +1462,17 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
// accumulate the total submit records
pParentSql->res.numOfRows += pSql->res.numOfRows;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
destroyTableNameList(pCmd);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
pCmd->insertParam.pDataBlocks = tscDestroyBlockArrayList(pCmd->insertParam.pDataBlocks);
if (pCmd->pTableBlockHashList == NULL) {
pCmd->pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pCmd->pTableBlockHashList == NULL) {
if (pCmd->insertParam.pTableBlockHashList == NULL) {
pCmd->insertParam.pTableBlockHashList = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false);
if (pCmd->insertParam.pTableBlockHashList == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _error;
}
......@@ -1479,7 +1480,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
STableDataBlocks *pTableDataBlock = NULL;
int32_t ret =
tscGetDataBlockFromList(pCmd->pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
tscGetDataBlockFromList(pCmd->insertParam.pTableBlockHashList, pTableMeta->id.uid, TSDB_PAYLOAD_SIZE, sizeof(SSubmitBlk),
tinfo.rowSize, &pTableMetaInfo->name, pTableMeta, &pTableDataBlock, NULL);
if (ret != TSDB_CODE_SUCCESS) {
pParentSql->res.code = TSDB_CODE_TSC_OUT_OF_MEMORY;
......@@ -1561,8 +1562,8 @@ void tscImportDataFromFile(SSqlObj *pSql) {
return;
}
assert(pCmd->dataSourceType == DATA_FROM_DATA_FILE && strlen(pCmd->payload) != 0);
pCmd->active = pCmd->pQueryInfo[0];
assert(TSDB_QUERY_HAS_TYPE(pCmd->insertParam.insertType, TSDB_QUERY_TYPE_FILE_INSERT) && strlen(pCmd->payload) != 0);
pCmd->active = pCmd->pQueryInfo;
SImportFileSupport *pSupporter = calloc(1, sizeof(SImportFileSupport));
SSqlObj *pNew = createSubqueryObj(pSql, 0, parseFileSendDataBlock, pSupporter, TSDB_SQL_INSERT, NULL);
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
......@@ -373,11 +373,15 @@ int taos_num_fields(TAOS_RES *res) {
if (pSql == NULL || pSql->signature != pSql) return 0;
int32_t num = 0;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return num;
}
while(pQueryInfo->pDownstream != NULL) {
pQueryInfo = pQueryInfo->pDownstream;
}
size_t numOfCols = tscNumOfFields(pQueryInfo);
for(int32_t i = 0; i < numOfCols; ++i) {
SInternalField* pInfo = taosArrayGet(pQueryInfo->fieldsInfo.internalField, i);
......@@ -408,7 +412,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
SSqlRes *pRes = &pSql->res;
if (pSql == NULL || pSql->signature != pSql) return 0;
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return NULL;
}
......@@ -560,7 +564,7 @@ static bool tscKillQueryInDnode(SSqlObj* pSql) {
return true;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if ((pQueryInfo == NULL) || tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return true;
......@@ -614,7 +618,7 @@ int taos_errno(TAOS_RES *tres) {
* why the sql is invalid
*/
static bool hasAdditionalErrorInfo(int32_t code, SSqlCmd *pCmd) {
if (code != TSDB_CODE_TSC_INVALID_SQL
if (code != TSDB_CODE_TSC_INVALID_OPERATION
&& code != TSDB_CODE_TSC_SQL_SYNTAX_ERROR) {
return false;
}
......@@ -673,7 +677,7 @@ char *taos_get_client_info() { return version; }
static void tscKillSTableQuery(SSqlObj *pSql) {
SSqlCmd* pCmd = &pSql->cmd;
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
if (!tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
return;
......@@ -724,7 +728,7 @@ void taos_stop_query(TAOS_RES *res) {
// set the error code for master pSqlObj firstly
pSql->res.code = TSDB_CODE_TSC_QUERY_CANCELLED;
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, pCmd->clauseIndex);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
assert(pSql->rpcRid <= 0);
......@@ -754,7 +758,7 @@ bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return true;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo == NULL) {
return true;
}
......@@ -829,9 +833,9 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
case TSDB_DATA_TYPE_NCHAR: {
int32_t charLen = varDataLen((char*)row[i] - VARSTR_HEADER_SIZE);
if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
assert(charLen <= fields[i].bytes);
assert(charLen <= fields[i].bytes && charLen >= 0);
} else {
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE);
assert(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0);
}
memcpy(str + len, row[i], charLen);
......@@ -868,15 +872,11 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
pSql->pTscObj = taos;
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
pRes->numOfTotal = 0;
pRes->numOfClauseTotal = 0;
pCmd->resColumnId = TSDB_RES_COL_ID;
tscDebug("0x%"PRIx64" Valid SQL: %s pObj:%p", pSql->self, sql, pObj);
......@@ -896,10 +896,10 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
strtolower(pSql->sqlstr, sql);
pCmd->curSql = NULL;
if (NULL != pCmd->pTableBlockHashList) {
taosHashCleanup(pCmd->pTableBlockHashList);
pCmd->pTableBlockHashList = NULL;
// pCmd->curSql = NULL;
if (NULL != pCmd->insertParam.pTableBlockHashList) {
taosHashCleanup(pCmd->insertParam.pTableBlockHashList);
pCmd->insertParam.pTableBlockHashList = NULL;
}
pSql->fp = asyncCallback;
......@@ -921,90 +921,19 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
return code;
}
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object
tscResetSqlCmd(&pSql->cmd, false);
SSqlCmd *pCmd = &pSql->cmd;
pCmd->command = TSDB_SQL_MULTI_META;
pCmd->count = 0;
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
char *str = (char *)tblNameList;
SQueryInfo *pQueryInfo = tscGetQueryInfoS(pCmd, pCmd->clauseIndex);
if (pQueryInfo == NULL) {
pSql->res.code = terrno;
return terrno;
}
STableMetaInfo *pTableMetaInfo = tscAddEmptyMetaInfo(pQueryInfo);
if ((code = tscAllocPayload(pCmd, tblListLen + 16)) != TSDB_CODE_SUCCESS) {
return code;
}
char *nextStr;
char tblName[TSDB_TABLE_FNAME_LEN];
int payloadLen = 0;
char *pMsg = pCmd->payload;
while (1) {
nextStr = strchr(str, ',');
if (nextStr == NULL) {
break;
}
memcpy(tblName, str, nextStr - str);
int32_t len = (int32_t)(nextStr - str);
tblName[len] = '\0';
str = nextStr + 1;
len = (int32_t)strtrim(tblName);
SStrToken sToken = {.n = len, .type = TK_ID, .z = tblName};
tGetToken(tblName, &sToken.type);
// Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "table name is invalid");
return code;
}
if ((code = tscSetTableFullName(pTableMetaInfo, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
if (++pCmd->count > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number");
return code;
}
int32_t xlen = tNameLen(&pTableMetaInfo->name);
if (payloadLen + xlen + 128 >= pCmd->allocSize) {
char *pNewMem = realloc(pCmd->payload, pCmd->allocSize + tblListLen);
if (pNewMem == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
sprintf(pCmd->payload, "failed to allocate memory");
return code;
}
pCmd->payload = pNewMem;
pCmd->allocSize = pCmd->allocSize + tblListLen;
pMsg = pCmd->payload;
}
char n[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, n);
payloadLen += sprintf(pMsg + payloadLen, "%s,", n);
void loadMultiTableMetaCallback(void *param, TAOS_RES *res, int code) {
SSqlObj* pSql = (SSqlObj*)taosAcquireRef(tscObjRef, (int64_t)param);
if (pSql == NULL) {
return;
}
*(pMsg + payloadLen) = '\0';
pCmd->payloadLen = payloadLen + 1;
taosReleaseRef(tscObjRef, pSql->self);
pSql->res.code = code;
tsem_post(&pSql->rspSem);
}
return TSDB_CODE_SUCCESS;
static void freeElem(void* p) {
tfree(*(char**)p);
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
......@@ -1020,38 +949,28 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
pSql->pTscObj = taos;
pSql->signature = pSql;
SSqlRes *pRes = &pSql->res;
pRes->code = 0;
pRes->numOfTotal = 0; // the number of getting table meta from server
pRes->numOfClauseTotal = 0;
assert(pSql->fp == NULL);
tscDebug("0x%"PRIx64" tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
pSql->fp = NULL; // todo set the correct callback function pointer
pSql->cmd.pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
int32_t tblListLen = (int32_t)strlen(tableNameList);
if (tblListLen > MAX_TABLE_NAME_LENGTH) {
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, tblListLen, MAX_TABLE_NAME_LENGTH);
int32_t length = (int32_t)strlen(tableNameList);
if (length > MAX_TABLE_NAME_LENGTH) {
tscError("0x%"PRIx64" tableNameList too long, length:%d, maximum allowed:%d", pSql->self, length, MAX_TABLE_NAME_LENGTH);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_INVALID_SQL;
return TSDB_CODE_TSC_INVALID_OPERATION;
}
char *str = calloc(1, tblListLen + 1);
char *str = calloc(1, length + 1);
if (str == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscError("0x%"PRIx64" failed to allocate sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
strtolower(str, tableNameList);
int32_t code = (uint8_t) tscParseTblNameList(pSql, str, tblListLen);
/*
* set the qhandle to 0 before return in order to erase the qhandle value assigned in the previous successful query.
* If qhandle is NOT set 0, the function of taos_free_result() will send message to server by calling tscBuildAndSendRequest()
* to free connection, which may cause segment fault, when the parse phrase is not even successfully executed.
*/
pRes->qId = 0;
SArray* plist = taosArrayInit(4, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(4, POINTER_BYTES);
int32_t code = (uint8_t) tscTransferTableNameList(pSql, str, length, plist);
free(str);
if (code != TSDB_CODE_SUCCESS) {
......@@ -1059,12 +978,23 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
return code;
}
tscDoQuery(pSql);
registerSqlObj(pSql);
tscDebug("0x%"PRIx64" load multiple table meta, tableNameList: %s pObj:%p", pSql->self, tableNameList, pObj);
tscDebug("0x%"PRIx64" load multi-table meta result:%d %s pObj:%p", pSql->self, pRes->code, taos_errstr(pSql), pObj);
if ((code = pRes->code) != TSDB_CODE_SUCCESS) {
tscFreeSqlObj(pSql);
code = getMultiTableMetaFromMnode(pSql, plist, vgroupList, loadMultiTableMetaCallback);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
code = TSDB_CODE_SUCCESS;
}
taosArrayDestroyEx(plist, freeElem);
taosArrayDestroyEx(vgroupList, freeElem);
if (code != TSDB_CODE_SUCCESS) {
tscFreeRegisteredSqlObj(pSql);
return code;
}
tsem_wait(&pSql->rspSem);
tscFreeRegisteredSqlObj(pSql);
return code;
}
......@@ -37,7 +37,7 @@ static int64_t getDelayValueAfterTimewindowClosed(SSqlStream* pStream, int64_t l
static bool isProjectStream(SQueryInfo* pQueryInfo) {
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
SExprInfo *pExpr = tscSqlExprGet(pQueryInfo, i);
SExprInfo *pExpr = tscExprGet(pQueryInfo, i);
if (pExpr->base.functionId != TSDB_FUNC_PRJ) {
return false;
}
......@@ -89,12 +89,12 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
return;
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
code = tscGetTableMeta(pSql, pTableMetaInfo);
if (code == 0 && UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
code = tscGetSTableVgroupInfo(pSql, 0);
code = tscGetSTableVgroupInfo(pSql, pQueryInfo);
}
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
......@@ -138,7 +138,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
if (pStream->isProject) {
......@@ -197,7 +197,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0);
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
......@@ -224,7 +224,7 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) {
#if 0
SSqlObj * pSql = pStream->pSql;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) {
return;
......@@ -273,7 +273,7 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
STableMetaInfo *pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful.
......@@ -444,7 +444,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
int64_t minIntervalTime =
(pStream->precision == TSDB_TIME_PRECISION_MICRO) ? tsMinIntervalTime * 1000L : tsMinIntervalTime;
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (!pStream->isProject && pQueryInfo->interval.interval == 0) {
sprintf(pSql->cmd.payload, "the interval value is 0");
......@@ -494,7 +494,7 @@ static int32_t tscSetSlidingWindowInfo(SSqlObj *pSql, SSqlStream *pStream) {
}
static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, int64_t stime) {
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
if (pStream->isProject) {
// no data in table, flush all data till now to destination meter, 10sec delay
......@@ -556,7 +556,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
return;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
STableComInfo tinfo = tscGetTableInfo(pTableMetaInfo->pTableMeta);
......@@ -614,16 +614,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return NULL;
}
pStream->stime = stime;
pStream->fp = fp;
pStream->stime = stime;
pStream->fp = fp;
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pStream->param = param;
pStream->pSql = pSql;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql);
......@@ -632,14 +632,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
strtolower(pSql->sqlstr, sqlstr);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
pSql->cmd.resColumnId = TSDB_RES_COL_ID;
tsem_init(&pSql->rspSem, 0, 0);
registerSqlObj(pSql);
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
tsem_init(&pSql->rspSem, 0, 0);
pSql->fp = tscCreateStream;
pSql->fetchFp = tscCreateStream;
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
......
......@@ -151,6 +151,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
strtolower(pSql->sqlstr, pSql->sqlstr);
pRes->qId = 0;
pRes->numOfRows = 1;
pCmd->resColumnId = TSDB_RES_COL_ID;
code = tscAllocPayload(pCmd, TSDB_DEFAULT_PAYLOAD_SIZE);
if (code != TSDB_CODE_SUCCESS) {
......@@ -173,7 +174,7 @@ static SSub* tscCreateSubscription(STscObj* pObj, const char* topic, const char*
if (pSql->cmd.command != TSDB_SQL_SELECT && pSql->cmd.command != TSDB_SQL_RETRIEVE_EMPTY_RESULT) {
line = __LINE__;
code = TSDB_CODE_TSC_INVALID_SQL;
code = TSDB_CODE_TSC_INVALID_OPERATION;
goto fail;
}
......@@ -266,7 +267,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
pSub->lastSyncTime = taosGetTimestampMs();
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SSubscriptionProgress target = {.uid = pTableMeta->id.uid, .key = 0};
......@@ -284,7 +285,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
size_t numOfTables = taosArrayGetSize(tables);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd, 0);
SQueryInfo* pQueryInfo = tscGetQueryInfo(pCmd);
SArray* progress = taosArrayInit(numOfTables, sizeof(SSubscriptionProgress));
for( size_t i = 0; i < numOfTables; i++ ) {
STidTags* tt = taosArrayGet( tables, i );
......@@ -304,7 +305,7 @@ static int tscUpdateSubscription(STscObj* pObj, SSub* pSub) {
}
taosArrayDestroy(tables);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd, 0)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
TSDB_QUERY_SET_TYPE(tscGetQueryInfo(pCmd)->type, TSDB_QUERY_TYPE_MULTITABLE_QUERY);
return 1;
}
......@@ -503,8 +504,8 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
SSqlObj *pSql = pSub->pSql;
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd, 0);
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
if (taosArrayGetSize(pSub->progress) > 0) { // fix crash in single table subscription
size_t size = taosArrayGetSize(pSub->progress);
......
此差异已折叠。
此差异已折叠。
......@@ -44,8 +44,8 @@ typedef struct SResPair {
// the structure for sql function in select clause
typedef struct SSqlExpr {
char aliasName[TSDB_COL_NAME_LEN]; // as aliasName
char token[TSDB_COL_NAME_LEN]; // original token
SColIndex colInfo;
uint64_t uid; // refactor use the pointer
int16_t functionId; // function id in aAgg array
......@@ -92,8 +92,6 @@ size_t tableIdPrefix(const char* name, char* prefix, int32_t len);
void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
//SSchema tGetTbnameColumnSchema();
SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
......
......@@ -2569,6 +2569,7 @@ _arithmetic_operator_fn_t getArithmeticOperatorFn(int32_t arithmeticOptr) {
case TSDB_BINARY_OP_REMAINDER:
return vectorRemainder;
default:
assert(0);
return NULL;
}
}
......@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <texpr.h>
#include "os.h"
#include "texpr.h"
......@@ -465,27 +466,29 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
return expr;
}
tExprNode* exprdup(tExprNode* pTree) {
if (pTree == NULL) {
tExprNode* exprdup(tExprNode* pNode) {
if (pNode == NULL) {
return NULL;
}
tExprNode* pNode = calloc(1, sizeof(tExprNode));
if (pTree->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pTree->_node.pLeft);
tExprNode* pRight = exprdup(pTree->_node.pRight);
pNode->nodeType = TSQL_NODE_EXPR;
pNode->_node.pLeft = pLeft;
pNode->_node.pRight = pRight;
} else if (pTree->nodeType == TSQL_NODE_VALUE) {
pNode->pVal = calloc(1, sizeof(tVariant));
tVariantAssign(pNode->pVal, pTree->pVal);
} else if (pTree->nodeType == TSQL_NODE_COL) {
pNode->pSchema = calloc(1, sizeof(SSchema));
*pNode->pSchema = *pTree->pSchema;
tExprNode* pCloned = calloc(1, sizeof(tExprNode));
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprNode* pLeft = exprdup(pNode->_node.pLeft);
tExprNode* pRight = exprdup(pNode->_node.pRight);
pCloned->_node.pLeft = pLeft;
pCloned->_node.pRight = pRight;
pCloned->_node.optr = pNode->_node.optr;
pCloned->_node.hasPK = pNode->_node.hasPK;
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
pCloned->pVal = calloc(1, sizeof(tVariant));
tVariantAssign(pCloned->pVal, pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
pCloned->pSchema = calloc(1, sizeof(SSchema));
*pCloned->pSchema = *pNode->pSchema;
}
return pNode;
pCloned->nodeType = pNode->nodeType;
return pCloned;
}
......@@ -310,6 +310,16 @@ public class TSDBJNIConnector {
private native int setBindTableNameImp(long stmt, String name, long conn);
public void setBindTableNameAndTags(long stmt, String tableName, int numOfTags, ByteBuffer tags, ByteBuffer typeList, ByteBuffer lengthList, ByteBuffer nullList) throws SQLException {
int code = setTableNameTagsImp(stmt, tableName, numOfTags, tags.array(), typeList.array(), lengthList.array(),
nullList.array(), this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "failed to bind table name and corresponding tags");
}
}
private native int setTableNameTagsImp(long stmt, String name, int numOfTags, byte[] tags, byte[] typeList, byte[] lengthList, byte[] nullList, long conn);
public void bindColumnDataArray(long stmt, ByteBuffer colDataList, ByteBuffer lengthList, ByteBuffer isNullList, int type, int bytes, int numOfRows,int columnIndex) throws SQLException {
int code = bindColDataImp(stmt, colDataList.array(), lengthList.array(), isNullList.array(), type, bytes, numOfRows, columnIndex, this.taos);
if (code != TSDBConstants.JNI_SUCCESS) {
......
......@@ -41,6 +41,9 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
private boolean isPrepared;
private ArrayList<ColumnInfo> colData;
private ArrayList<TableTagInfo> tableTags;
private int tagValueLength;
private String tableName;
private long nativeStmtHandle = 0;
......@@ -63,8 +66,8 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
if (parameterCnt > 1) {
// the table name is also a parameter, so ignore it.
this.colData = new ArrayList<ColumnInfo>(parameterCnt - 1);
this.colData.addAll(Collections.nCopies(parameterCnt - 1, null));
this.colData = new ArrayList<ColumnInfo>();
this.tableTags = new ArrayList<TableTagInfo>();
}
}
......@@ -562,11 +565,109 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
}
};
private static class TableTagInfo {
private boolean isNull;
private Object value;
private int type;
public TableTagInfo(Object value, int type) {
this.value = value;
this.type = type;
}
public static TableTagInfo createNullTag(int type) {
TableTagInfo info = new TableTagInfo(null, type);
info.isNull = true;
return info;
}
};
public void setTableName(String name) {
this.tableName = name;
}
private void ensureTagCapacity(int index) {
if (this.tableTags.size() < index + 1) {
int delta = index + 1 - this.tableTags.size();
this.tableTags.addAll(Collections.nCopies(delta, null));
}
}
public void setTagNull(int index, int type) {
ensureTagCapacity(index);
this.tableTags.set(index, TableTagInfo.createNullTag(type));
}
public void setTagBoolean(int index, boolean value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BOOL));
this.tagValueLength += Byte.BYTES;
}
public void setTagInt(int index, int value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_INT));
this.tagValueLength += Integer.BYTES;
}
public void setTagByte(int index, byte value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_TINYINT));
this.tagValueLength += Byte.BYTES;
}
public void setTagShort(int index, short value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_SMALLINT));
this.tagValueLength += Short.BYTES;
}
public void setTagLong(int index, long value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BIGINT));
this.tagValueLength += Long.BYTES;
}
public void setTagTimestamp(int index, long value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP));
this.tagValueLength += Long.BYTES;
}
public void setTagFloat(int index, float value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_FLOAT));
this.tagValueLength += Float.BYTES;
}
public void setTagDouble(int index, double value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_DOUBLE));
this.tagValueLength += Double.BYTES;
}
public void setTagString(int index, String value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_BINARY));
this.tagValueLength += value.getBytes().length;
}
public void setTagNString(int index, String value) {
ensureTagCapacity(index);
this.tableTags.set(index, new TableTagInfo(value, TSDBConstants.TSDB_DATA_TYPE_NCHAR));
String charset = TaosGlobalConfig.getCharset();
try {
this.tagValueLength += value.getBytes(charset).length;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
public <T> void setValueImpl(int columnIndex, ArrayList<T> list, int type, int bytes) throws SQLException {
if (this.colData.size() == 0) {
this.colData.addAll(Collections.nCopies(this.parameters.length - 1 - this.tableTags.size(), null));
}
ColumnInfo col = (ColumnInfo) this.colData.get(columnIndex);
if (col == null) {
ColumnInfo p = new ColumnInfo();
......@@ -641,7 +742,122 @@ public class TSDBPreparedStatement extends TSDBStatement implements PreparedStat
TSDBJNIConnector connector = ((TSDBConnection) this.getConnection()).getConnector();
this.nativeStmtHandle = connector.prepareStmt(rawSql);
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
if (this.tableTags == null) {
connector.setBindTableName(this.nativeStmtHandle, this.tableName);
} else {
int num = this.tableTags.size();
ByteBuffer tagDataList = ByteBuffer.allocate(this.tagValueLength);
tagDataList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer typeList = ByteBuffer.allocate(num);
typeList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer lengthList = ByteBuffer.allocate(num * Long.BYTES);
lengthList.order(ByteOrder.LITTLE_ENDIAN);
ByteBuffer isNullList = ByteBuffer.allocate(num * Integer.BYTES);
isNullList.order(ByteOrder.LITTLE_ENDIAN);
for (int i = 0; i < num; ++i) {
TableTagInfo tag = this.tableTags.get(i);
if (tag.isNull) {
typeList.put((byte) tag.type);
isNullList.putInt(1);
lengthList.putLong(0);
continue;
}
switch (tag.type) {
case TSDBConstants.TSDB_DATA_TYPE_INT: {
Integer val = (Integer) tag.value;
tagDataList.putInt(val);
lengthList.putLong(Integer.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TINYINT: {
Byte val = (Byte) tag.value;
tagDataList.put(val);
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_BOOL: {
Boolean val = (Boolean) tag.value;
tagDataList.put((byte) (val ? 1 : 0));
lengthList.putLong(Byte.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_SMALLINT: {
Short val = (Short) tag.value;
tagDataList.putShort(val);
lengthList.putLong(Short.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_TIMESTAMP:
case TSDBConstants.TSDB_DATA_TYPE_BIGINT: {
Long val = (Long) tag.value;
tagDataList.putLong(val == null ? 0 : val);
lengthList.putLong(Long.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_FLOAT: {
Float val = (Float) tag.value;
tagDataList.putFloat(val == null ? 0 : val);
lengthList.putLong(Float.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_DOUBLE: {
Double val = (Double) tag.value;
tagDataList.putDouble(val == null ? 0 : val);
lengthList.putLong(Double.BYTES);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_NCHAR:
case TSDBConstants.TSDB_DATA_TYPE_BINARY: {
String charset = TaosGlobalConfig.getCharset();
String val = (String) tag.value;
byte[] b = null;
try {
if (tag.type == TSDBConstants.TSDB_DATA_TYPE_BINARY) {
b = val.getBytes();
} else {
b = val.getBytes(charset);
}
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
tagDataList.put(b);
lengthList.putLong(b.length);
break;
}
case TSDBConstants.TSDB_DATA_TYPE_UTINYINT:
case TSDBConstants.TSDB_DATA_TYPE_USMALLINT:
case TSDBConstants.TSDB_DATA_TYPE_UINT:
case TSDBConstants.TSDB_DATA_TYPE_UBIGINT: {
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "not support data types");
}
}
typeList.put((byte) tag.type);
isNullList.putInt(tag.isNull? 1 : 0);
}
connector.setBindTableNameAndTags(this.nativeStmtHandle, this.tableName, this.tableTags.size(), tagDataList,
typeList, lengthList, isNullList);
}
ColumnInfo colInfo = (ColumnInfo) this.colData.get(0);
if (colInfo == null) {
......
......@@ -86,6 +86,17 @@ static SStep tsDnodeSteps[] = {
{"dnode-telemetry", dnodeInitTelemetry, dnodeCleanupTelemetry},
};
static SStep tsDnodeCompactSteps[] = {
{"dnode-tfile", tfInit, tfCleanup},
{"dnode-storage", dnodeInitStorage, dnodeCleanupStorage},
{"dnode-eps", dnodeInitEps, dnodeCleanupEps},
{"dnode-wal", walInit, walCleanUp},
{"dnode-mread", dnodeInitMRead, NULL},
{"dnode-mwrite", dnodeInitMWrite, NULL},
{"dnode-mpeer", dnodeInitMPeer, NULL},
{"dnode-modules", dnodeInitModules, dnodeCleanupModules},
};
static int dnodeCreateDir(const char *dir) {
if (mkdir(dir, 0755) != 0 && errno != EEXIST) {
return -1;
......@@ -95,13 +106,23 @@ static int dnodeCreateDir(const char *dir) {
}
static void dnodeCleanupComponents() {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeSteps, stepSize);
if (!tsCompactMnodeWal) {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeSteps, stepSize);
} else {
int32_t stepSize = sizeof(tsDnodeCompactSteps) / sizeof(SStep);
dnodeStepCleanup(tsDnodeCompactSteps, stepSize);
}
}
static int32_t dnodeInitComponents() {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeSteps, stepSize);
if (!tsCompactMnodeWal) {
int32_t stepSize = sizeof(tsDnodeSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeSteps, stepSize);
} else {
int32_t stepSize = sizeof(tsDnodeCompactSteps) / sizeof(SStep);
return dnodeStepInit(tsDnodeCompactSteps, stepSize);
}
}
static int32_t dnodeInitTmr() {
......
......@@ -112,6 +112,7 @@ typedef struct TAOS_MULTI_BIND {
TAOS_STMT *taos_stmt_init(TAOS *taos);
int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags);
int taos_stmt_set_tbname(TAOS_STMT* stmt, const char* name);
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......
......@@ -347,6 +347,7 @@ do { \
#define TSDB_QUERY_TYPE_TAG_FILTER_QUERY 0x400u
#define TSDB_QUERY_TYPE_INSERT 0x100u // insert type
#define TSDB_QUERY_TYPE_MULTITABLE_QUERY 0x200u
#define TSDB_QUERY_TYPE_FILE_INSERT 0x400u // insert data from file
#define TSDB_QUERY_TYPE_STMT_INSERT 0x800u // stmt insert type
#define TSDB_QUERY_HAS_TYPE(x, _type) (((x) & (_type)) != 0)
......
......@@ -74,7 +74,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_REF_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x010A) //"Ref is not there")
//client
#define TSDB_CODE_TSC_INVALID_SQL TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid SQL statement")
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) //"Invalid Operation")
#define TSDB_CODE_TSC_INVALID_QHANDLE TAOS_DEF_ERROR_CODE(0, 0x0201) //"Invalid qhandle")
#define TSDB_CODE_TSC_INVALID_TIME_STAMP TAOS_DEF_ERROR_CODE(0, 0x0202) //"Invalid combination of client/service time")
#define TSDB_CODE_TSC_INVALID_VALUE TAOS_DEF_ERROR_CODE(0, 0x0203) //"Invalid value in client")
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册