提交 e1079416 编写于 作者: H Haojun Liao

Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj

......@@ -163,12 +163,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
printf("subscribe err\n");
return;
}
/*int32_t cnt = 0;*/
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
if (tmqmessage) {
/*cnt++;*/
cnt++;
printf("get data\n");
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
......
......@@ -136,7 +136,8 @@ static FORCE_INLINE void colDataAppendInt8(SColumnInfoData* pColumnInfoData, uin
}
static FORCE_INLINE void colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int16_t*)p = *(int16_t*)v;
}
......@@ -210,15 +211,19 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
void blockDebugShowData(const SArray* dataBlocks);
static FORCE_INLINE int32_t blockEstimateEncodeSize(const SSDataBlock* pBlock) {
return blockDataGetSerialMetaSize(pBlock) + (int32_t)ceil(blockDataGetSerialRowSize(pBlock) * pBlock->info.rows);
}
static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data,
int8_t compressed) {
int8_t compressed) {
int32_t colSize = colDataGetLength(pColRes, numOfRows);
return (*(tDataTypes[pColRes->info.type].compFunc))(pColRes->pData, colSize, numOfRows, data,
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
}
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
int8_t needCompress) {
int8_t needCompress) {
int32_t* colSizes = (int32_t*)data;
data += numOfCols * sizeof(int32_t);
......
......@@ -18,6 +18,7 @@
#include "tarray.h"
#include "tdef.h"
#include "tconfig.h"
#ifdef __cplusplus
extern "C" {
......@@ -129,6 +130,7 @@ void taosCfgDynamicOptions(const char *option, const char *value);
void taosAddDataDir(int32_t index, char *v1, int32_t level, int32_t primary);
struct SConfig *taosGetCfg();
int32_t taosAddClientLogCfg(SConfig *pCfg);
#ifdef __cplusplus
}
......
......@@ -260,6 +260,7 @@ typedef struct {
typedef struct SSchema {
int8_t type;
int8_t index; // default is 0, not index created
col_id_t colId;
int32_t bytes;
char name[TSDB_COL_NAME_LEN];
......
......@@ -142,6 +142,43 @@ typedef struct {
} \
} while (0)
#define NUM_TO_STRING(_inputType, _input, _outputBytes, _output) \
do { \
switch (_inputType) { \
case TSDB_DATA_TYPE_TINYINT: \
snprintf(_output, (int32_t)(_outputBytes), "%d", *(int8_t *)(_input)); \
break; \
case TSDB_DATA_TYPE_UTINYINT: \
snprintf(_output, (int32_t)(_outputBytes), "%d", *(uint8_t *)(_input)); \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
snprintf(_output, (int32_t)(_outputBytes), "%d", *(int16_t *)(_input)); \
break; \
case TSDB_DATA_TYPE_USMALLINT: \
snprintf(_output, (int32_t)(_outputBytes), "%d", *(uint16_t *)(_input)); \
break; \
case TSDB_DATA_TYPE_TIMESTAMP: \
case TSDB_DATA_TYPE_BIGINT: \
snprintf(_output, (int32_t)(_outputBytes), "%" PRId64, *(int64_t *)(_input)); \
break; \
case TSDB_DATA_TYPE_UBIGINT: \
snprintf(_output, (int32_t)(_outputBytes), "%" PRIu64, *(uint64_t *)(_input)); \
break; \
case TSDB_DATA_TYPE_FLOAT: \
snprintf(_output, (int32_t)(_outputBytes), "%f", *(float *)(_input)); \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
snprintf(_output, (int32_t)(_outputBytes), "%f", *(double *)(_input)); \
break; \
case TSDB_DATA_TYPE_UINT: \
snprintf(_output, (int32_t)(_outputBytes), "%u", *(uint32_t *)(_input)); \
break; \
default: \
snprintf(_output, (int32_t)(_outputBytes), "%d", *(int32_t *)(_input)); \
break; \
} \
} while (0)
#define IS_SIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_TINYINT && (_t) <= TSDB_DATA_TYPE_BIGINT)
#define IS_UNSIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_UTINYINT && (_t) <= TSDB_DATA_TYPE_UBIGINT)
#define IS_FLOAT_TYPE(_t) ((_t) == TSDB_DATA_TYPE_FLOAT || (_t) == TSDB_DATA_TYPE_DOUBLE)
......
......@@ -78,6 +78,9 @@ typedef struct {
typedef struct {
float uptime; // day
int8_t has_mnode;
int8_t has_qnode;
int8_t has_snode;
int8_t has_bnode;
SMonDiskDesc logdir;
SMonDiskDesc tempdir;
} SMonDnodeInfo;
......@@ -134,8 +137,8 @@ typedef struct {
typedef struct {
int32_t expire_time;
int32_t timeseries_used;
int32_t timeseries_total;
int64_t timeseries_used;
int64_t timeseries_total;
} SMonGrantInfo;
typedef struct {
......
......@@ -19,8 +19,8 @@
extern "C" {
#endif
#include "tcommon.h"
#include "nodes.h"
#include "tcommon.h"
typedef struct SFilterInfo SFilterInfo;
typedef int32_t (*filer_get_col_from_id)(void *, int32_t, void **);
......@@ -31,20 +31,20 @@ enum {
FLT_OPTION_NEED_UNIQE = 4,
};
typedef struct SFilterColumnParam{
typedef struct SFilterColumnParam {
int32_t numOfCols;
SArray* pDataBlock;
SArray *pDataBlock;
} SFilterColumnParam;
extern int32_t filterInitFromNode(SNode *pNode, SFilterInfo **pinfo, uint32_t options);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnDataAgg *statis, int16_t numOfCols);
extern bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t **p, SColumnDataAgg *statis, int16_t numOfCols);
extern int32_t filterSetDataFromSlotId(SFilterInfo *info, void *param);
extern int32_t filterSetDataFromColId(SFilterInfo *info, void *param);
extern int32_t filterGetTimeRange(SNode *pNode, STimeWindow *win, bool *isStrict);
extern int32_t filterConverNcharColumns(SFilterInfo* pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo* pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
extern int32_t filterConverNcharColumns(SFilterInfo *pFilterInfo, int32_t rows, bool *gotNchar);
extern int32_t filterFreeNcharColumns(SFilterInfo *pFilterInfo);
extern void filterFreeInfo(SFilterInfo *info);
extern bool filterRangeExecute(SFilterInfo *info, SColumnDataAgg *pDataStatis, int32_t numOfCols, int32_t numOfRows);
#ifdef __cplusplus
}
......
......@@ -70,6 +70,9 @@ int32_t ltrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOut
int32_t rtrimFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
/* Conversion functions */
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
bool getTimePseudoFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t winStartTsFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
......
......@@ -39,7 +39,7 @@ int32_t taosGetEmail(char *email, int32_t maxLen);
int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen);
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
int32_t taosGetCpuCores(float *numOfCores);
int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine);
void taosGetCpuUsage(double *cpu_system, double *cpu_engine);
int32_t taosGetTotalMemory(int64_t *totalKB);
int32_t taosGetProcMemory(int64_t *usedKB);
int32_t taosGetSysMemory(int64_t *usedKB);
......
......@@ -597,6 +597,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_INVALID_ROLLUP_OPTION TAOS_DEF_ERROR_CODE(0, 0x2622)
#define TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION TAOS_DEF_ERROR_CODE(0, 0x2623)
#define TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST TAOS_DEF_ERROR_CODE(0, 0x2624)
#define TSDB_CODE_PAR_INVALID_OPTION_UNIT TAOS_DEF_ERROR_CODE(0, 0x2625)
#define TSDB_CODE_PAR_INVALID_KEEP_UNIT TAOS_DEF_ERROR_CODE(0, 0x2626)
//planner
#define TSDB_CODE_PLAN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x2700)
......
......@@ -75,12 +75,12 @@ typedef int32_t (*FHbReqHandle)(SClientHbKey* connKey, void* param, SClientHbReq
typedef struct {
int8_t inited;
// ctl
int8_t threadStop;
TdThread thread;
int8_t threadStop;
TdThread thread;
TdThreadMutex lock; // used when app init and cleanup
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX];
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
SArray* appHbMgrs; // SArray<SAppHbMgr*> one for each cluster
FHbReqHandle reqHandle[HEARTBEAT_TYPE_MAX];
FHbRspHandle rspHandle[HEARTBEAT_TYPE_MAX];
} SClientHbMgr;
typedef struct SQueryExecMetric {
......@@ -118,42 +118,42 @@ struct SAppInstInfo {
};
typedef struct SAppInfo {
int64_t startTime;
char appName[TSDB_APP_NAME_LEN];
char* ep;
int32_t pid;
int32_t numOfThreads;
SHashObj* pInstMap;
int64_t startTime;
char appName[TSDB_APP_NAME_LEN];
char* ep;
int32_t pid;
int32_t numOfThreads;
SHashObj* pInstMap;
TdThreadMutex mutex;
} SAppInfo;
typedef struct STscObj {
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char ver[128];
int32_t acctId;
uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
char user[TSDB_USER_LEN];
char pass[TSDB_PASSWORD_LEN];
char db[TSDB_DB_FNAME_LEN];
char ver[128];
int32_t acctId;
uint32_t connId;
int32_t connType;
uint64_t id; // ref ID returned by taosAddRef
TdThreadMutex mutex; // used to protect the operation on db
int32_t numOfReqs; // number of sqlObj bound to this connection
SAppInstInfo* pAppInfo;
} STscObj;
typedef struct SResultColumn {
union {
char* nullbitmap; // bitmap, one bit for each item in the list
int32_t* offset;
char* nullbitmap; // bitmap, one bit for each item in the list
int32_t* offset;
};
char* pData;
char* pData;
} SResultColumn;
typedef struct SReqResultInfo {
const char* pRspMsg;
const char* pData;
TAOS_FIELD* fields; // todo, column names are not needed.
TAOS_FIELD* userFields; // the fields info that return to user
TAOS_FIELD* fields; // todo, column names are not needed.
TAOS_FIELD* userFields; // the fields info that return to user
uint32_t numOfCols;
int32_t* length;
char** convertBuf;
......@@ -180,13 +180,30 @@ typedef struct SRequestSendRecvBody {
SShowReqInfo showInfo; // todo this attribute will be removed after the query framework being completed.
SDataBuf requestMsg;
int64_t queryJob; // query job, created according to sql query DAG.
struct SQueryPlan* pDag; // the query dag, generated according to the sql statement.
struct SQueryPlan* pDag; // the query dag, generated according to the sql statement.
SReqResultInfo resInfo;
} SRequestSendRecvBody;
#define ERROR_MSG_BUF_DEFAULT_SIZE 512
enum {
RES_TYPE__QUERY = 1,
RES_TYPE__TMQ,
};
#define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY)
#define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ)
typedef struct SMqRspObj {
int8_t resType;
char* topic;
void* vg;
SArray* res; // SArray<SReqResultInfo>
int32_t resIter;
} SMqRspObj;
typedef struct SRequestObj {
int8_t resType; // query or tmq
uint64_t requestId;
int32_t type; // request type
STscObj* pTscObj;
......@@ -203,6 +220,25 @@ typedef struct SRequestObj {
SRequestSendRecvBody body;
} SRequestObj;
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
int32_t resIter = msg->resIter == -1 ? 0 : msg->resIter;
return (SReqResultInfo*)taosArrayGet(msg->res, resIter);
}
static FORCE_INLINE SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res;
if (++msg->resIter < taosArrayGetSize(msg->res)) {
return (SReqResultInfo*)taosArrayGet(msg->res, msg->resIter);
}
return NULL;
}
static FORCE_INLINE SReqResultInfo* tscGetCurResInfo(TAOS_RES* res) {
if (TD_RES_QUERY(res)) return &(((SRequestObj*)res)->body.resInfo);
return tmqGetCurResInfo(res);
}
extern SAppInfo appInfo;
extern int32_t clientReqRefPool;
extern int32_t clientConnRefPool;
......@@ -238,14 +274,17 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port);
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
int32_t buildRequest(STscObj* pTscObj, const char* sql, int sqlLen, SRequestObj** pRequest);
int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery);
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList);
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4);
void doSetOneRowPtr(SReqResultInfo* pResultInfo);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
// --- heartbeat
// global, called by mgmt
......
......@@ -149,6 +149,7 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
return NULL;
}
pRequest->resType = RES_TYPE__QUERY;
pRequest->pDb = getDbOfConnection(pObj);
pRequest->requestId = generateRequestId();
pRequest->metric.start = taosGetTimestampUs();
......
......@@ -13,7 +13,6 @@
static int32_t initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet);
static SMsgSendInfo* buildConnectMsg(SRequestObj* pRequest);
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody);
static int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
static bool stringLengthCheck(const char* str, size_t maxsize) {
if (str == NULL) {
......@@ -42,7 +41,6 @@ static char* getClusterKey(const char* user, const char* auth, const char* ip, i
static STscObj* taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
SAppInstInfo* pAppInfo);
static void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port) {
......@@ -174,7 +172,7 @@ int32_t parseSql(SRequestObj* pRequest, bool topicQuery, SQuery** pQuery) {
int32_t execLocalCmd(SRequestObj* pRequest, SQuery* pQuery) {
SRetrieveTableRsp* pRsp = NULL;
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
int32_t code = qExecCommand(pQuery->pRoot, &pRsp);
if (TSDB_CODE_SUCCESS == code && NULL != pRsp) {
code = setQueryResultFromRsp(&pRequest->body.resInfo, pRsp, false);
}
......@@ -190,7 +188,7 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
SCmdMsgInfo* pMsgInfo = pQuery->pCmdMsg;
pRequest->type = pMsgInfo->msgType;
pRequest->body.requestMsg = (SDataBuf){.pData = pMsgInfo->pMsg, .len = pMsgInfo->msgLen, .handle = NULL};
pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management
pMsgInfo->pMsg = NULL; // pMsg transferred to SMsgSendInfo management
STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildMsgInfoImpl(pRequest);
......@@ -211,14 +209,12 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQuery* pQuery) {
int32_t getPlan(SRequestObj* pRequest, SQuery* pQuery, SQueryPlan** pPlan, SArray* pNodeList) {
pRequest->type = pQuery->msgType;
SPlanContext cxt = {
.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite
};
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
SPlanContext cxt = {.queryId = pRequest->requestId,
.acctId = pRequest->pTscObj->acctId,
.mgmtEpSet = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp),
.pAstRoot = pQuery->pRoot,
.showRewrite = pQuery->showRewrite};
int32_t code = qCreateQueryPlan(&cxt, pPlan, pNodeList);
if (code != 0) {
return code;
}
......@@ -234,10 +230,10 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
for (int32_t i = 0; i < pResInfo->numOfCols; ++i) {
pResInfo->fields[i].bytes = pSchema[i].bytes;
pResInfo->fields[i].type = pSchema[i].type;
pResInfo->fields[i].type = pSchema[i].type;
pResInfo->userFields[i].bytes = pSchema[i].bytes;
pResInfo->userFields[i].type = pSchema[i].type;
pResInfo->userFields[i].type = pSchema[i].type;
if (pSchema[i].type == TSDB_DATA_TYPE_VARCHAR) {
pResInfo->userFields[i].bytes -= VARSTR_HEADER_SIZE;
......@@ -254,7 +250,8 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter;
SQueryResult res = {.code = 0, .numOfRows = 0, .msgSize = ERROR_MSG_BUF_DEFAULT_SIZE, .msg = pRequest->msgBuf};
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr, pRequest->metric.start, &res);
int32_t code = schedulerExecJob(pTransporter, pNodeList, pDag, &pRequest->body.queryJob, pRequest->sqlstr,
pRequest->metric.start, &res);
if (code != TSDB_CODE_SUCCESS) {
if (pRequest->body.queryJob != 0) {
schedulerFreeJob(pRequest->body.queryJob);
......@@ -274,14 +271,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList
}
pRequest->code = res.code;
terrno = res.code;
terrno = res.code;
return pRequest->code;
}
SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
SQuery* pQuery = NULL;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
SQuery* pQuery = NULL;
SArray* pNodeList = taosArrayInit(4, sizeof(struct SQueryNodeAddr));
int32_t code = buildRequest(pTscObj, sql, sqlLen, &pRequest);
if (TSDB_CODE_SUCCESS == code) {
......@@ -320,15 +317,15 @@ SRequestObj* execQueryImpl(STscObj* pTscObj, const char* sql, int sqlLen) {
}
int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SCatalog *pCatalog = NULL;
int32_t code = 0;
int32_t dbNum = taosArrayGetSize(pRequest->dbList);
int32_t tblNum = taosArrayGetSize(pRequest->tableList);
SCatalog* pCatalog = NULL;
int32_t code = 0;
int32_t dbNum = taosArrayGetSize(pRequest->dbList);
int32_t tblNum = taosArrayGetSize(pRequest->tableList);
if (dbNum <= 0 && tblNum <= 0) {
return TSDB_CODE_QRY_APP_ERROR;
}
code = catalogGetHandle(pTscObj->pAppInfo->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -337,8 +334,8 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
SEpSet epset = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
for (int32_t i = 0; i < dbNum; ++i) {
char *dbFName = taosArrayGet(pRequest->dbList, i);
char* dbFName = taosArrayGet(pRequest->dbList, i);
code = catalogRefreshDBVgInfo(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, dbFName);
if (code != TSDB_CODE_SUCCESS) {
return code;
......@@ -346,7 +343,7 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
}
for (int32_t i = 0; i < tblNum; ++i) {
SName *tableName = taosArrayGet(pRequest->tableList, i);
SName* tableName = taosArrayGet(pRequest->tableList, i);
code = catalogRefreshTableMeta(pCatalog, pTscObj->pAppInfo->pTransporter, &epset, tableName, -1);
if (code != TSDB_CODE_SUCCESS) {
......@@ -357,11 +354,10 @@ int32_t refreshMeta(STscObj* pTscObj, SRequestObj* pRequest) {
return code;
}
SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
SRequestObj* pRequest = NULL;
int32_t retryNum = 0;
int32_t code = 0;
int32_t retryNum = 0;
int32_t code = 0;
while (retryNum++ < REQUEST_MAX_TRY_TIMES) {
pRequest = execQueryImpl(pTscObj, sql, sqlLen);
......@@ -377,7 +373,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) {
destroyRequest(pRequest);
}
return pRequest;
}
......@@ -509,7 +505,8 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
}
bool persistConnForSpecificMsg(void* parenct, tmsg_t msgType) {
return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP || msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
return msgType == TDMT_VND_QUERY_RSP || msgType == TDMT_VND_FETCH_RSP || msgType == TDMT_VND_RES_READY_RSP ||
msgType == TDMT_VND_QUERY_HEARTBEAT_RSP;
}
void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
......@@ -536,10 +533,10 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
int32_t elapsed = pRequest->metric.rsp - pRequest->metric.start;
if (pMsg->code == TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " message:%s, code:%s rspLen:%d, elapsed:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
} else {
tscError("0x%" PRIx64 " SQL cmd:%s, code:%s rspLen:%d, elapsed time:%d ms, reqId:0x%" PRIx64, pRequest->self,
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed/1000, pRequest->requestId);
TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), pMsg->contLen, elapsed / 1000, pRequest->requestId);
}
taosReleaseRef(clientReqRefPool, pSendInfo->requestObjRefId);
......@@ -590,7 +587,7 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c
return taos_connect(ipStr, userStr, passStr, dbStr, port);
}
static void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
SResultColumn* pCol = &pResultInfo->pCol[i];
......@@ -722,8 +719,8 @@ _return:
static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
if (pResInfo->row == NULL) {
pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
pResInfo->row = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
pResInfo->pCol = taosMemoryCalloc(pResInfo->numOfCols, sizeof(SResultColumn));
pResInfo->length = taosMemoryCalloc(pResInfo->numOfCols, sizeof(int32_t));
pResInfo->convertBuf = taosMemoryCalloc(pResInfo->numOfCols, POINTER_BYTES);
......@@ -770,7 +767,8 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
return TSDB_CODE_SUCCESS;
}
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows, bool convertUcs4) {
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows,
bool convertUcs4) {
assert(numOfCols > 0 && pFields != NULL && pResultInfo != NULL);
if (numOfRows == 0) {
return TSDB_CODE_SUCCESS;
......@@ -841,15 +839,16 @@ void resetConnectDB(STscObj* pTscObj) {
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4) {
assert(pResultInfo != NULL && pRsp != NULL);
pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows);
pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->pRspMsg = (const char*)pRsp;
pResultInfo->pData = (void*)pRsp->data;
pResultInfo->numOfRows = htonl(pRsp->numOfRows);
pResultInfo->current = 0;
pResultInfo->completed = (pRsp->completed == 1);
pResultInfo->payloadLen = htonl(pRsp->compLen);
pResultInfo->precision = pRsp->precision;
pResultInfo->precision = pRsp->precision;
// TODO handle the compressed case
pResultInfo->totalRows += pResultInfo->numOfRows;
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows, convertUcs4);
return setResultDataPtr(pResultInfo, pResultInfo->fields, pResultInfo->numOfCols, pResultInfo->numOfRows,
convertUcs4);
}
......@@ -71,7 +71,7 @@ void taos_cleanup(void) {
tscInfo("all local resources released");
}
setConfRet taos_set_config(const char *config) {
setConfRet taos_set_config(const char *config) {
// TODO
setConfRet ret = {SET_CONF_RET_SUCC, {0}};
return ret;
......@@ -133,8 +133,7 @@ int taos_field_count(TAOS_RES *res) {
return 0;
}
SRequestObj *pRequest = (SRequestObj *)res;
SReqResultInfo *pResInfo = &pRequest->body.resInfo;
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
return pResInfo->numOfCols;
}
......@@ -145,7 +144,7 @@ TAOS_FIELD *taos_fetch_fields(TAOS_RES *res) {
return NULL;
}
SReqResultInfo *pResInfo = &(((SRequestObj *)res)->body.resInfo);
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
return pResInfo->userFields;
}
......@@ -162,13 +161,36 @@ TAOS_ROW taos_fetch_row(TAOS_RES *res) {
return NULL;
}
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return NULL;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return NULL;
}
return doFetchRow(pRequest, true, true);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *msg = ((SMqRspObj *)res);
SReqResultInfo *pResultInfo = taosArrayGet(msg->res, msg->resIter);
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
return doFetchRow(pRequest, true, true);
if (pResultInfo->row == NULL) {
msg->resIter++;
pResultInfo = taosArrayGet(msg->res, msg->resIter);
doSetOneRowPtr(pResultInfo);
pResultInfo->current += 1;
}
return pResultInfo->row;
} else {
// assert to avoid uninitialization error
ASSERT(0);
}
return NULL;
}
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
......@@ -260,12 +282,12 @@ int *taos_fetch_lengths(TAOS_RES *res) {
return NULL;
}
return ((SRequestObj *)res)->body.resInfo.length;
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
return pResInfo->length;
}
TAOS_ROW *taos_result_block(TAOS_RES *res) {
SRequestObj* pRequest = (SRequestObj*) res;
if (pRequest == NULL) {
if (res == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
......@@ -274,7 +296,8 @@ TAOS_ROW *taos_result_block(TAOS_RES *res) {
return NULL;
}
return &pRequest->body.resInfo.row;
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
return &pResInfo->row;
}
// todo intergrate with tDataTypes
......@@ -313,7 +336,7 @@ const char *taos_data_type(int type) {
const char *taos_get_client_info() { return version; }
int taos_affected_rows(TAOS_RES *res) {
if (res == NULL) {
if (res == NULL || TD_RES_TMQ(res)) {
return 0;
}
......@@ -323,12 +346,17 @@ int taos_affected_rows(TAOS_RES *res) {
}
int taos_result_precision(TAOS_RES *res) {
SRequestObj* pRequest = (SRequestObj*) res;
if (pRequest == NULL) {
if (res == NULL) {
return TSDB_TIME_PRECISION_MILLI;
}
return pRequest->body.resInfo.precision;
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
return pRequest->body.resInfo.precision;
} else if (TD_RES_TMQ(res)) {
SReqResultInfo *info = tmqGetCurResInfo(res);
return info->precision;
}
return TSDB_TIME_PRECISION_MILLI;
}
int taos_select_db(TAOS *taos, const char *db) {
......@@ -370,90 +398,115 @@ void taos_stop_query(TAOS_RES *res) {
}
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
SRequestObj *pRequestObj = res;
SReqResultInfo *pResultInfo = &pRequestObj->body.resInfo;
SReqResultInfo *pResultInfo = tscGetCurResInfo(res);
if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) {
return true;
}
SResultColumn *pCol = &pRequestObj->body.resInfo.pCol[col];
SResultColumn *pCol = &pResultInfo->pCol[col];
return colDataIsNull_f(pCol->nullbitmap, row);
}
bool taos_is_update_query(TAOS_RES *res) {
return taos_num_fields(res) == 0;
}
bool taos_is_update_query(TAOS_RES *res) { return taos_num_fields(res) == 0; }
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
int32_t numOfRows = 0;
/*int32_t code = */taos_fetch_block_s(res, &numOfRows, rows);
/*int32_t code = */ taos_fetch_block_s(res, &numOfRows, rows);
return numOfRows;
}
int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest == NULL) {
int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows) {
if (res == NULL) {
return 0;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
(*rows) = NULL;
(*numOfRows) = 0;
(*rows) = NULL;
(*numOfRows) = 0;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
}
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
}
doFetchRow(pRequest, false, true);
doFetchRow(pRequest, false, true);
// TODO refactor
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
// TODO refactor
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
(*rows) = pResultInfo->row;
(*numOfRows) = pResultInfo->numOfRows;
return pRequest->code;
}
(*rows) = pResultInfo->row;
(*numOfRows) = pResultInfo->numOfRows;
return pRequest->code;
} else if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
if (pResultInfo == NULL) return -1;
int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest == NULL) {
pResultInfo->current = pResultInfo->numOfRows;
(*rows) = pResultInfo->row;
(*numOfRows) = pResultInfo->numOfRows;
return 0;
} else {
ASSERT(0);
return -1;
}
}
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) {
if (res == NULL) {
return 0;
}
if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res;
doFetchRow(pRequest, false, false);
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || taos_num_fields(res) == 0) {
return 0;
}
doFetchRow(pRequest, false, false);
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
SReqResultInfo *pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData;
return 0;
pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void*) pResultInfo->pData;
} else if (TD_RES_TMQ(res)) {
SReqResultInfo *pResultInfo = tmqGetNextResInfo(res);
if (pResultInfo == NULL) return -1;
return 0;
pResultInfo->current = pResultInfo->numOfRows;
(*numOfRows) = pResultInfo->numOfRows;
(*pData) = (void *)pResultInfo->pData;
return 0;
} else {
ASSERT(0);
return -1;
}
}
int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex) {
SRequestObj *pRequest = (SRequestObj *)res;
if (pRequest == NULL) {
if (res == NULL) {
return 0;
}
int32_t numOfFields = taos_num_fields(pRequest);
int32_t numOfFields = taos_num_fields(res);
if (columnIndex < 0 || columnIndex >= numOfFields || numOfFields == 0) {
return 0;
}
TAOS_FIELD* pField = &pRequest->body.resInfo.userFields[columnIndex];
SReqResultInfo *pResInfo = tscGetCurResInfo(res);
TAOS_FIELD *pField = &pResInfo->userFields[columnIndex];
if (!IS_VAR_DATA_TYPE(pField->type)) {
return 0;
}
return pRequest->body.resInfo.pCol[columnIndex].offset;
return pResInfo->pCol[columnIndex].offset;
}
int taos_validate_sql(TAOS *taos, const char *sql) { return true; }
......@@ -483,18 +536,19 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
// TODO
}
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) {
// TODO
return NULL;
TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp,
void *param, int interval) {
// TODO
return NULL;
}
TAOS_RES *taos_consume(TAOS_SUB *tsub) {
// TODO
return NULL;
// TODO
return NULL;
}
void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) {
// TODO
// TODO
}
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
......@@ -553,26 +607,26 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
}
int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) {
// TODO
return -1;
// TODO
return -1;
}
int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
// TODO
return -1;
// TODO
return -1;
}
int taos_stmt_add_batch(TAOS_STMT* stmt) {
// TODO
return -1;
int taos_stmt_add_batch(TAOS_STMT *stmt) {
// TODO
return -1;
}
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
// TODO
return NULL;
// TODO
return NULL;
}
int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) {
// TODO
return -1;
int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
// TODO
return -1;
}
......@@ -17,25 +17,19 @@
#include "clientLog.h"
#include "parser.h"
#include "planner.h"
#include "scheduler.h"
#include "tdatablock.h"
#include "tdef.h"
#include "tglobal.h"
#include "tmsgtype.h"
#include "tpagedbuf.h"
#include "tqueue.h"
#include "tref.h"
typedef struct {
int32_t curBlock;
int32_t curRow;
void** uData;
} SMqRowIter;
struct tmq_message_t {
SMqPollRsp msg;
char* topic;
void* vg;
SMqRowIter iter;
SArray* res; // SArray<SReqResultInfo>
int32_t resIter;
};
struct tmq_list_t {
......@@ -849,7 +843,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
if (msgEpoch < tmqEpoch) {
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
/*tsem_post(&tmq->rspSem);*/
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch, tmqEpoch);
tscWarn("msg discard from vg %d since from earlier epoch, rsp epoch %d, current epoch %d", pParam->vgId, msgEpoch,
tmqEpoch);
return 0;
}
......@@ -886,8 +881,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg);
pRsp->iter.curBlock = 0;
pRsp->iter.curRow = 0;
/*pRsp->iter.curBlock = 0;*/
/*pRsp->iter.curRow = 0;*/
// TODO: alloc mem
/*pRsp->*/
/*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/
......@@ -899,8 +894,8 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
}
#endif
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, pRsp->msg.reqOffset,
pRsp->msg.rspOffset);
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId,
pRsp->msg.reqOffset, pRsp->msg.rspOffset);
pRsp->vg = pParam->pVg;
taosWriteQitem(tmq->mqueue, pRsp);
......@@ -921,7 +916,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, topicNumGet);
tscDebug("consumer %ld update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch,
topicNumGet);
SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic));
if (newTopics == NULL) {
return false;
......@@ -1289,7 +1285,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t transporterId = 0;
/*printf("send poll\n");*/
atomic_add_fetch_32(&tmq->waitingRequest, 1);
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId,
pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++;
......@@ -1566,30 +1563,4 @@ const char* tmq_err2str(tmq_resp_err_t err) {
return "fail";
}
TAOS_ROW tmq_get_row(tmq_message_t* message) {
SMqPollRsp* rsp = &message->msg;
while (1) {
if (message->iter.curBlock < taosArrayGetSize(rsp->pBlockData)) {
SSDataBlock* pBlock = taosArrayGet(rsp->pBlockData, message->iter.curBlock);
if (message->iter.curRow < pBlock->info.rows) {
for (int i = 0; i < pBlock->info.numOfCols; i++) {
SColumnInfoData* pData = taosArrayGet(pBlock->pDataBlock, i);
if (colDataIsNull_s(pData, message->iter.curRow))
message->iter.uData[i] = NULL;
else {
message->iter.uData[i] = colDataGetData(pData, message->iter.curRow);
}
}
message->iter.curRow++;
return message->iter.uData;
} else {
message->iter.curBlock++;
message->iter.curRow = 0;
continue;
}
}
return NULL;
}
}
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
......@@ -256,7 +256,7 @@ static int32_t taosLoadCfg(SConfig *pCfg, const char *inputCfgDir, const char *e
return 0;
}
static int32_t taosAddClientLogCfg(SConfig *pCfg) {
int32_t taosAddClientLogCfg(SConfig *pCfg) {
if (cfgAddDir(pCfg, "configDir", configDir, 1) != 0) return -1;
if (cfgAddDir(pCfg, "scriptDir", configDir, 1) != 0) return -1;
if (cfgAddDir(pCfg, "logDir", tsLogDir, 1) != 0) return -1;
......
......@@ -308,6 +308,7 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) {
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.nTagCols);
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].type);
tlen += taosEncodeFixedI8(buf, pReq->stbCfg.pTagSchema[i].index);
tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pTagSchema[i].colId);
tlen += taosEncodeFixedI32(buf, pReq->stbCfg.pTagSchema[i].bytes);
tlen += taosEncodeString(buf, pReq->stbCfg.pTagSchema[i].name);
......@@ -378,6 +379,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) {
pReq->stbCfg.pTagSchema = (SSchema *)taosMemoryMalloc(pReq->stbCfg.nTagCols * sizeof(SSchema));
for (col_id_t i = 0; i < pReq->stbCfg.nTagCols; ++i) {
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].type));
buf = taosDecodeFixedI8(buf, &(pReq->stbCfg.pTagSchema[i].index));
buf = taosDecodeFixedI16(buf, &pReq->stbCfg.pTagSchema[i].colId);
buf = taosDecodeFixedI32(buf, &pReq->stbCfg.pTagSchema[i].bytes);
buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name);
......@@ -1989,7 +1991,7 @@ void tFreeSUseDbBatchRsp(SUseDbBatchRsp *pRsp) {
taosArrayDestroy(pRsp->pArray);
}
int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
int32_t tSerializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2002,7 +2004,7 @@ int32_t tSerializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
return tlen;
}
int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
int32_t tDeserializeSDbCfgReq(void *buf, int32_t bufLen, SDbCfgReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2014,7 +2016,7 @@ int32_t tDeserializeSDbCfgReq(void* buf, int32_t bufLen, SDbCfgReq* pReq) {
return 0;
}
int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) {
int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2045,7 +2047,7 @@ int32_t tSerializeSDbCfgRsp(void* buf, int32_t bufLen, const SDbCfgRsp* pRsp) {
return tlen;
}
int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
int32_t tDeserializeSDbCfgRsp(void *buf, int32_t bufLen, SDbCfgRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2076,7 +2078,7 @@ int32_t tDeserializeSDbCfgRsp(void* buf, int32_t bufLen, SDbCfgRsp* pRsp) {
return 0;
}
int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
int32_t tSerializeSUserIndexReq(void *buf, int32_t bufLen, SUserIndexReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2089,7 +2091,7 @@ int32_t tSerializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq)
return tlen;
}
int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq) {
int32_t tDeserializeSUserIndexReq(void *buf, int32_t bufLen, SUserIndexReq *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2101,7 +2103,7 @@ int32_t tDeserializeSUserIndexReq(void* buf, int32_t bufLen, SUserIndexReq* pReq
return 0;
}
int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp* pRsp) {
int32_t tSerializeSUserIndexRsp(void *buf, int32_t bufLen, const SUserIndexRsp *pRsp) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......@@ -2118,7 +2120,7 @@ int32_t tSerializeSUserIndexRsp(void* buf, int32_t bufLen, const SUserIndexRsp*
return tlen;
}
int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp) {
int32_t tDeserializeSUserIndexRsp(void *buf, int32_t bufLen, SUserIndexRsp *pRsp) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
......@@ -2134,7 +2136,6 @@ int32_t tDeserializeSUserIndexRsp(void* buf, int32_t bufLen, SUserIndexRsp* pRsp
return 0;
}
int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
......
......@@ -1028,13 +1028,18 @@ int32_t taosVariantTypeSetType(SVariant *pVariant, char type) {
char * taosVariantGet(SVariant *pVar, int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_TIMESTAMP:
return (char *)&pVar->i;
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
return (char *)&pVar->u;
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_FLOAT:
return (char *)&pVar->d;
......@@ -1042,7 +1047,7 @@ char * taosVariantGet(SVariant *pVar, int32_t type) {
return (char *)pVar->pz;
case TSDB_DATA_TYPE_NCHAR:
return (char *)pVar->ucs4;
default:
default:
return NULL;
}
......
......@@ -58,7 +58,7 @@ int32_t bmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dError("failed to create bnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId);
return -1;
} else {
return bmOpen(pWrapper);
return dndOpenNode(pWrapper);
}
}
......@@ -77,6 +77,7 @@ int32_t bmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dError("failed to drop bnode since %s", terrstr());
return -1;
} else {
// dndCloseNode(pWrapper);
return bmDrop(pWrapper);
}
}
......
......@@ -25,11 +25,10 @@ static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
static void dmGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
pInfo->uptime = (taosGetTimestampMs() - pDnode->rebootTime) / (86400000.0f);
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, MNODE);
if (pWrapper != NULL) {
pInfo->has_mnode = pWrapper->required;
dndReleaseWrapper(pWrapper);
}
pInfo->has_mnode = pDnode->wrappers[MNODE].required;
pInfo->has_qnode = pDnode->wrappers[QNODE].required;
pInfo->has_snode = pDnode->wrappers[SNODE].required;
pInfo->has_bnode = pDnode->wrappers[BNODE].required;
tstrncpy(pInfo->logdir.name, tsLogDir, sizeof(pInfo->logdir.name));
pInfo->logdir.size = tsLogSpace.size;
tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name));
......@@ -65,7 +64,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
bool getFromAPI = !tsMultiProcess;
pWrapper = &pDnode->wrappers[MNODE];
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
if (dndMarkWrapper(pWrapper) == 0) {
mmGetMonitorInfo(pWrapper, &mmInfo);
dndReleaseWrapper(pWrapper);
}
......@@ -82,7 +81,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
pWrapper = &pDnode->wrappers[VNODES];
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
if (dndMarkWrapper(pWrapper) == 0) {
vmGetMonitorInfo(pWrapper, &vmInfo);
dndReleaseWrapper(pWrapper);
}
......@@ -99,7 +98,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
pWrapper = &pDnode->wrappers[QNODE];
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
if (dndMarkWrapper(pWrapper) == 0) {
qmGetMonitorInfo(pWrapper, &qmInfo);
dndReleaseWrapper(pWrapper);
}
......@@ -116,7 +115,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
pWrapper = &pDnode->wrappers[SNODE];
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
if (dndMarkWrapper(pWrapper) == 0) {
smGetMonitorInfo(pWrapper, &smInfo);
dndReleaseWrapper(pWrapper);
}
......@@ -133,7 +132,7 @@ void dmSendMonitorReport(SDnode *pDnode) {
pWrapper = &pDnode->wrappers[BNODE];
if (getFromAPI) {
if (dndMarkWrapper(pWrapper) != 0) {
if (dndMarkWrapper(pWrapper) == 0) {
bmGetMonitorInfo(pWrapper, &bmInfo);
dndReleaseWrapper(pWrapper);
}
......
......@@ -27,7 +27,7 @@ extern "C" {
typedef struct SBnodeMgmt {
SBnode *pBnode;
SDnode *pDnode;
SMgmtWrapper *pWrapper;
SMgmtWrapper *pWrapper;
const char *path;
SMultiWorker writeWorker;
SSingleWorker monitorWorker;
......
......@@ -126,6 +126,7 @@ typedef struct SDnode {
int32_t numOfDisks;
uint16_t serverPort;
bool dropped;
EProcType procType;
EDndType ntype;
EDndStatus status;
EDndEvent event;
......
......@@ -27,7 +27,81 @@ static bool dndRequireNode(SMgmtWrapper *pWrapper) {
return required;
}
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
static int32_t dndInitNodeProc(SMgmtWrapper *pWrapper) {
int32_t shmsize = tsMnodeShmSize;
if (pWrapper->ntype == VNODES) {
shmsize = tsVnodeShmSize;
} else if (pWrapper->ntype == QNODE) {
shmsize = tsQnodeShmSize;
} else if (pWrapper->ntype == SNODE) {
shmsize = tsSnodeShmSize;
} else if (pWrapper->ntype == MNODE) {
shmsize = tsMnodeShmSize;
} else if (pWrapper->ntype == BNODE) {
shmsize = tsBnodeShmSize;
} else {
return -1;
}
if (taosCreateShm(&pWrapper->shm, pWrapper->ntype, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
return -1;
}
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
SProcCfg cfg = dndGenProcCfg(pWrapper);
cfg.isChild = false;
pWrapper->procType = PROC_PARENT;
pWrapper->pProc = taosProcInit(&cfg);
if (pWrapper->pProc == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
return 0;
}
static int32_t dndNewNodeProc(SMgmtWrapper *pWrapper, EDndType n) {
char tstr[8] = {0};
char *args[6] = {0};
snprintf(tstr, sizeof(tstr), "%d", n);
args[1] = "-c";
args[2] = configDir;
args[3] = "-n";
args[4] = tstr;
args[5] = NULL;
int32_t pid = taosNewProc(args);
if (pid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
return -1;
}
pWrapper->procId = pid;
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
return 0;
}
static int32_t dndRunNodeProc(SMgmtWrapper *pWrapper) {
if (pWrapper->pDnode->ntype == NODE_MAX) {
dInfo("node:%s, should be started manually", pWrapper->name);
} else {
if (dndNewNodeProc(pWrapper, pWrapper->ntype) != 0) {
return -1;
}
}
if (taosProcRun(pWrapper->pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
return 0;
}
static int32_t dndOpenNodeImp(SMgmtWrapper *pWrapper) {
if (taosMkDir(pWrapper->path) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to create dir:%s since %s", pWrapper->name, pWrapper->path, terrstr());
......@@ -44,7 +118,19 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
return 0;
}
void dndCloseNode(SMgmtWrapper *pWrapper) {
int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
SDnode *pDnode = pWrapper->pDnode;
if (pDnode->procType == PROC_SINGLE) {
return dndOpenNodeImp(pWrapper);
} else if (pDnode->procType == PROC_PARENT) {
if (dndInitNodeProc(pWrapper) != 0) return -1;
if (dndWriteShmFile(pDnode) != 0) return -1;
if (dndRunNodeProc(pWrapper) != 0) return -1;
}
return 0;
}
static void dndCloseNodeImp(SMgmtWrapper *pWrapper) {
dDebug("node:%s, mgmt start to close", pWrapper->name);
pWrapper->required = false;
taosWLockLatch(&pWrapper->latch);
......@@ -65,27 +151,17 @@ void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, mgmt has been closed", pWrapper->name);
}
static int32_t dndNewProc(SMgmtWrapper *pWrapper, EDndType n) {
char tstr[8] = {0};
char *args[6] = {0};
snprintf(tstr, sizeof(tstr), "%d", n);
args[1] = "-c";
args[2] = configDir;
args[3] = "-n";
args[4] = tstr;
args[5] = NULL;
int32_t pid = taosNewProc(args);
if (pid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
dError("node:%s, failed to exec in new process since %s", pWrapper->name, terrstr());
return -1;
void dndCloseNode(SMgmtWrapper *pWrapper) {
if (pWrapper->pDnode->procType == PROC_PARENT) {
if (pWrapper->procId > 0 && taosProcExist(pWrapper->procId)) {
dInfo("node:%s, send kill signal to the child process:%d", pWrapper->name, pWrapper->procId);
taosKillProc(pWrapper->procId);
dInfo("node:%s, wait for child process:%d to stop", pWrapper->name, pWrapper->procId);
taosWaitProc(pWrapper->procId);
dInfo("node:%s, child process:%d is stopped", pWrapper->name, pWrapper->procId);
}
}
pWrapper->procId = pid;
dInfo("node:%s, continue running in new process:%d", pWrapper->name, pid);
return 0;
dndCloseNodeImp(pWrapper);
}
static void dndProcessProcHandle(void *handle) {
......@@ -96,13 +172,14 @@ static void dndProcessProcHandle(void *handle) {
static int32_t dndRunInSingleProcess(SDnode *pDnode) {
dInfo("dnode run in single process");
pDnode->procType = PROC_SINGLE;
for (EDndType n = DNODE; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
if (dndOpenNode(pWrapper) != 0) {
if (dndOpenNodeImp(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
......@@ -136,8 +213,10 @@ static int32_t dndRunInSingleProcess(SDnode *pDnode) {
static int32_t dndRunInParentProcess(SDnode *pDnode) {
dInfo("dnode run in parent process");
pDnode->procType = PROC_PARENT;
SMgmtWrapper *pDWrapper = &pDnode->wrappers[DNODE];
if (dndOpenNode(pDWrapper) != 0) {
if (dndOpenNodeImp(pDWrapper) != 0) {
dError("node:%s, failed to start since %s", pDWrapper->name, terrstr());
return -1;
}
......@@ -146,36 +225,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) continue;
int32_t shmsize = tsMnodeShmSize;
if (n == VNODES) {
shmsize = tsVnodeShmSize;
} else if (n == QNODE) {
shmsize = tsQnodeShmSize;
} else if (n == SNODE) {
shmsize = tsSnodeShmSize;
} else if (n == MNODE) {
shmsize = tsMnodeShmSize;
} else if (n == BNODE) {
shmsize = tsBnodeShmSize;
} else {
}
if (taosCreateShm(&pWrapper->shm, n, shmsize) != 0) {
terrno = TAOS_SYSTEM_ERROR(terrno);
dError("node:%s, failed to create shm size:%d since %s", pWrapper->name, shmsize, terrstr());
return -1;
}
dInfo("node:%s, shm:%d is created, size:%d", pWrapper->name, pWrapper->shm.id, shmsize);
SProcCfg cfg = dndGenProcCfg(pWrapper);
cfg.isChild = false;
pWrapper->procType = PROC_PARENT;
pWrapper->pProc = taosProcInit(&cfg);
if (pWrapper->pProc == NULL) {
dError("node:%s, failed to create proc since %s", pWrapper->name, terrstr());
return -1;
}
if (dndInitNodeProc(pWrapper) != 0) return -1;
}
if (dndWriteShmFile(pDnode) != 0) {
......@@ -186,19 +236,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
for (EDndType n = DNODE + 1; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pDnode->ntype == NODE_MAX) {
dInfo("node:%s, should be started manually", pWrapper->name);
} else {
if (dndNewProc(pWrapper, n) != 0) {
return -1;
}
}
if (taosProcRun(pWrapper->pProc) != 0) {
dError("node:%s, failed to run proc since %s", pWrapper->name, terrstr());
return -1;
}
if (dndRunNodeProc(pWrapper) != 0) return -1;
}
dndSetStatus(pDnode, DND_STAT_RUNNING);
......@@ -239,7 +277,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
if (pWrapper->procId <= 0 || !taosProcExist(pWrapper->procId)) {
dWarn("node:%s, process:%d is killed and needs to be restarted", pWrapper->name, pWrapper->procId);
taosProcCloseHandles(pWrapper->pProc, dndProcessProcHandle);
dndNewProc(pWrapper, n);
dndNewNodeProc(pWrapper, n);
}
}
}
......@@ -253,6 +291,7 @@ static int32_t dndRunInParentProcess(SDnode *pDnode) {
static int32_t dndRunInChildProcess(SDnode *pDnode) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[pDnode->ntype];
dInfo("%s run in child process", pWrapper->name);
pDnode->procType = PROC_CHILD;
pWrapper->required = dndRequireNode(pWrapper);
if (!pWrapper->required) {
......@@ -264,7 +303,7 @@ static int32_t dndRunInChildProcess(SDnode *pDnode) {
tmsgSetDefaultMsgCb(&msgCb);
pWrapper->procType = PROC_CHILD;
if (dndOpenNode(pWrapper) != 0) {
if (dndOpenNodeImp(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
......
......@@ -80,6 +80,7 @@ int32_t mmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dError("failed to drop mnode since %s", terrstr());
return -1;
} else {
// dndCloseNode(pWrapper);
return mmDrop(pWrapper);
}
}
......
......@@ -58,7 +58,7 @@ int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dError("failed to create qnode since %s", terrstr());
return -1;
} else {
return qmOpen(pWrapper);
return dndOpenNode(pWrapper);
}
}
......@@ -77,6 +77,7 @@ int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dError("failed to drop qnode since %s", terrstr());
return -1;
} else {
// dndCloseNode(pWrapper);
return qmDrop(pWrapper);
}
}
......
......@@ -32,7 +32,7 @@ static void qmProcessMonitorQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
if (pMsg->rpcMsg.msgType == TDMT_MON_SM_INFO) {
if (pMsg->rpcMsg.msgType == TDMT_MON_QM_INFO) {
code = qmProcessGetMonQmInfoReq(pMgmt->pWrapper, pMsg);
} else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
......
......@@ -58,7 +58,7 @@ int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
dError("failed to create snode since %s", terrstr());
return -1;
} else {
return smOpen(pWrapper);
return dndOpenNode(pWrapper);
}
}
......@@ -78,6 +78,7 @@ int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
return -1;
} else {
return smDrop(pWrapper);
// return dndCloseNode(pWrapper);
}
}
......
......@@ -55,6 +55,8 @@ target_include_directories(
vnode
PUBLIC "inc"
PRIVATE "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
)
target_link_libraries(
vnode
......@@ -69,6 +71,7 @@ target_link_libraries(
PUBLIC scheduler
PUBLIC tdb
#PUBLIC bdb
#PUBLIC scalar
PUBLIC transport
PUBLIC stream
)
......
......@@ -92,9 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput,
return false;
}
// NOTE: there are four bytes of an integer more than the required buffer space.
// struct size + data payload + length for each column + bitmap length
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockDataGetSerialMetaSize(pInput->pData) + blockDataGetSize(pInput->pData);
pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockEstimateEncodeSize(pInput->pData);
pBuf->pData = taosMemoryMalloc(pBuf->allocSize);
if (pBuf->pData == NULL) {
......
......@@ -390,7 +390,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.checkFunc = checkAndGetResultType,
.getEnvFunc = NULL,
.initFunc = NULL,
.sprocessFunc = NULL,
.sprocessFunc = castFunction,
.finalizeFunc = NULL
},
{
......@@ -600,7 +600,13 @@ int32_t checkAndGetResultType(SFunctionNode* pFunc) {
break;
}
case FUNCTION_TYPE_CAST: {
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes, .type = TSDB_DATA_TYPE_BIGINT };
//type
SValueNode* pParam = nodesListGetNode(pFunc->pParameterList, 1);
int32_t paraType = pParam->datum.i;
//bytes
pParam = nodesListGetNode(pFunc->pParameterList, 2);
int32_t paraBytes = pParam->datum.i;
pFunc->node.resType = (SDataType) { .bytes = paraBytes, .type = paraType};
break;
}
......
......@@ -375,6 +375,9 @@ static void monGenDnodeJson(SMonInfo *pMonitor) {
tjsonAddDoubleToObject(pJson, "vnodes_num", pStat->totalVnodes);
tjsonAddDoubleToObject(pJson, "masters", pStat->masterNum);
tjsonAddDoubleToObject(pJson, "has_mnode", pInfo->has_mnode);
tjsonAddDoubleToObject(pJson, "has_qnode", pInfo->has_qnode);
tjsonAddDoubleToObject(pJson, "has_snode", pInfo->has_snode);
tjsonAddDoubleToObject(pJson, "has_bnode", pInfo->has_bnode);
}
static void monGenDiskJson(SMonInfo *pMonitor) {
......@@ -530,7 +533,7 @@ void monSendReport() {
if (pCont != NULL) {
EHttpCompFlag flag = tsMonitor.cfg.comp ? HTTP_GZIP : HTTP_FLAT;
if (taosSendHttpReport(tsMonitor.cfg.server, tsMonitor.cfg.port, pCont, strlen(pCont), flag) != 0) {
uError("failed to send monitor msg since %s", terrstr());
uError("failed to send monitor msg");
}
taosMemoryFree(pCont);
}
......
......@@ -194,9 +194,9 @@ int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) {
if (tDecodeCStrTo(decoder, desc.database_name) < 0) return -1;
if (tDecodeCStrTo(decoder, desc.status) < 0) return -1;
for (int32_t j = 0; j < TSDB_MAX_REPLICA; ++j) {
SMonVnodeDesc vdesc = {0};
if (tDecodeI32(decoder, &vdesc.dnode_id) < 0) return -1;
if (tDecodeCStrTo(decoder, vdesc.vnode_role) < 0) return -1;
SMonVnodeDesc *pVDesc = &desc.vnodes[j];
if (tDecodeI32(decoder, &pVDesc->dnode_id) < 0) return -1;
if (tDecodeCStrTo(decoder, pVDesc->vnode_role) < 0) return -1;
}
taosArrayPush(pInfo->vgroups, &desc);
}
......@@ -205,15 +205,15 @@ int32_t tDecodeSMonVgroupInfo(SCoder *decoder, SMonVgroupInfo *pInfo) {
int32_t tEncodeSMonGrantInfo(SCoder *encoder, const SMonGrantInfo *pInfo) {
if (tEncodeI32(encoder, pInfo->expire_time) < 0) return -1;
if (tEncodeI32(encoder, pInfo->timeseries_used) < 0) return -1;
if (tEncodeI32(encoder, pInfo->timeseries_total) < 0) return -1;
if (tEncodeI64(encoder, pInfo->timeseries_used) < 0) return -1;
if (tEncodeI64(encoder, pInfo->timeseries_total) < 0) return -1;
return 0;
}
int32_t tDecodeSMonGrantInfo(SCoder *decoder, SMonGrantInfo *pInfo) {
if (tDecodeI32(decoder, &pInfo->expire_time) < 0) return -1;
if (tDecodeI32(decoder, &pInfo->timeseries_used) < 0) return -1;
if (tDecodeI32(decoder, &pInfo->timeseries_total) < 0) return -1;
if (tDecodeI64(decoder, &pInfo->timeseries_used) < 0) return -1;
if (tDecodeI64(decoder, &pInfo->timeseries_total) < 0) return -1;
return 0;
}
......
......@@ -257,13 +257,20 @@ SNodeList* addValueNodeFromTypeToList(SAstCreateContext* pCxt, SDataType dataTyp
char buf[64] = {0};
//add value node for type
snprintf(buf, sizeof(buf), "%u", dataType.type);
SToken token = {.type = TSDB_DATA_TYPE_TINYINT, .n = strlen(buf), .z = buf};
SToken token = {.type = TSDB_DATA_TYPE_SMALLINT, .n = strlen(buf), .z = buf};
SNode* pNode = createValueNode(pCxt, token.type, &token);
addNodeToList(pCxt, pList, pNode);
//add value node for bytes
memset(buf, 0, sizeof(buf));
snprintf(buf, sizeof(buf), "%u", dataType.bytes);
int32_t bytes;
if (IS_VAR_DATA_TYPE(dataType.type)) {
bytes = (dataType.type == TSDB_DATA_TYPE_NCHAR) ? dataType.bytes * TSDB_NCHAR_SIZE : dataType.bytes;
bytes += VARSTR_HEADER_SIZE;
} else {
bytes = dataType.bytes;
}
snprintf(buf, sizeof(buf), "%d", bytes);
token.type = TSDB_DATA_TYPE_BIGINT;
token.n = strlen(buf);
token.z = buf;
......
......@@ -21,7 +21,7 @@
#include "parUtil.h"
#include "ttime.h"
#define GET_OPTION_VAL(pVal, defaultVal) (NULL == (pVal) ? (defaultVal) : ((SValueNode*)(pVal))->datum.i)
#define GET_OPTION_VAL(pVal, defaultVal) (NULL == (pVal) ? (defaultVal) : getBigintFromValueNode((SValueNode*)(pVal)))
typedef struct STranslateContext {
SParseContext* pParseCxt;
......@@ -380,6 +380,7 @@ static EDealRes translateColumn(STranslateContext* pCxt, SColumnNode* pCol) {
static EDealRes translateValue(STranslateContext* pCxt, SValueNode* pVal) {
uint8_t precision = (NULL != pCxt->pCurrStmt ? pCxt->pCurrStmt->precision : pVal->node.resType.precision);
pVal->node.resType.precision = precision;
if (pVal->isDuration) {
if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) !=
TSDB_CODE_SUCCESS) {
......@@ -452,6 +453,9 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
}
pOp->node.resType.type = TSDB_DATA_TYPE_DOUBLE;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes;
} else {
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
}
return DEAL_RES_CONTINUE;
}
......@@ -1041,6 +1045,27 @@ static int32_t translateSelect(STranslateContext* pCxt, SSelectStmt* pSelect) {
return code;
}
static int64_t getUnitPerMinute(uint8_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
return MILLISECOND_PER_MINUTE;
case TSDB_TIME_PRECISION_MICRO:
return MILLISECOND_PER_MINUTE * 1000L;
case TSDB_TIME_PRECISION_NANO:
return NANOSECOND_PER_MINUTE;
default:
break;
}
return MILLISECOND_PER_MINUTE;
}
static int64_t getBigintFromValueNode(SValueNode* pVal) {
if (pVal->isDuration) {
return pVal->datum.i / getUnitPerMinute(pVal->node.resType.precision);
}
return pVal->datum.i;
}
static int32_t buildCreateDbRetentions(const SNodeList* pRetentions, SCreateDbReq* pReq) {
if (NULL != pRetentions) {
pReq->pRetensions = taosArrayInit(LIST_LENGTH(pRetentions), sizeof(SRetention));
......@@ -1098,7 +1123,10 @@ static int32_t checkRangeOption(STranslateContext* pCxt, const char* pName, SVal
if (DEAL_RES_ERROR == translateValue(pCxt, pVal)) {
return pCxt->errCode;
}
int64_t val = pVal->datum.i;
if (pVal->isDuration && (TIME_UNIT_MINUTE != pVal->unit && TIME_UNIT_HOUR != pVal->unit && TIME_UNIT_DAY != pVal->unit)) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_OPTION_UNIT, pName, pVal->unit);
}
int64_t val = getBigintFromValueNode(pVal);
if (val < minVal || val > maxVal) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_RANGE_OPTION, pName, val, minVal, maxVal);
}
......@@ -1187,9 +1215,18 @@ static int32_t checkKeepOption(STranslateContext* pCxt, SNodeList* pKeep) {
}
}
int32_t daysToKeep0 = ((SValueNode*)nodesListGetNode(pKeep, 0))->datum.i;
int32_t daysToKeep1 = ((SValueNode*)nodesListGetNode(pKeep, 1))->datum.i;
int32_t daysToKeep2 = ((SValueNode*)nodesListGetNode(pKeep, 2))->datum.i;
SValueNode* pKeep0 = (SValueNode*)nodesListGetNode(pKeep, 0);
SValueNode* pKeep1 = (SValueNode*)nodesListGetNode(pKeep, 1);
SValueNode* pKeep2 = (SValueNode*)nodesListGetNode(pKeep, 2);
if ((pKeep0->isDuration && (TIME_UNIT_MINUTE != pKeep0->unit && TIME_UNIT_HOUR != pKeep0->unit && TIME_UNIT_DAY != pKeep0->unit)) ||
(pKeep1->isDuration && (TIME_UNIT_MINUTE != pKeep1->unit && TIME_UNIT_HOUR != pKeep1->unit && TIME_UNIT_DAY != pKeep1->unit)) ||
(pKeep2->isDuration && (TIME_UNIT_MINUTE != pKeep2->unit && TIME_UNIT_HOUR != pKeep2->unit && TIME_UNIT_DAY != pKeep2->unit))) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_KEEP_UNIT, pKeep0->unit, pKeep1->unit, pKeep2->unit);
}
int32_t daysToKeep0 = getBigintFromValueNode(pKeep0);
int32_t daysToKeep1 = getBigintFromValueNode(pKeep1);
int32_t daysToKeep2 = getBigintFromValueNode(pKeep2);
if (daysToKeep0 < TSDB_MIN_KEEP || daysToKeep1 < TSDB_MIN_KEEP || daysToKeep2 < TSDB_MIN_KEEP ||
daysToKeep0 > TSDB_MAX_KEEP || daysToKeep1 > TSDB_MAX_KEEP || daysToKeep2 > TSDB_MAX_KEEP) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_INVALID_KEEP_VALUE, daysToKeep0, daysToKeep1, daysToKeep2,
......@@ -1240,8 +1277,7 @@ static int32_t checkDatabaseOptions(STranslateContext* pCxt, SDatabaseOptions* p
code = checkRangeOption(pCxt, "compression", pOptions->pCompressionLevel, TSDB_MIN_COMP_LEVEL, TSDB_MAX_COMP_LEVEL);
}
if (TSDB_CODE_SUCCESS == code) {
code =
checkRangeOption(pCxt, "daysPerFile", pOptions->pDaysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
code = checkRangeOption(pCxt, "daysPerFile", pOptions->pDaysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
}
if (TSDB_CODE_SUCCESS == code) {
code = checkRangeOption(pCxt, "fsyncPeriod", pOptions->pFsyncPeriod, TSDB_MIN_FSYNC_PERIOD, TSDB_MAX_FSYNC_PERIOD);
......
......@@ -62,35 +62,39 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_INTERVAL_VALUE_TOO_SMALL:
return "This interval value is too small : %s";
case TSDB_CODE_PAR_DB_NOT_SPECIFIED:
return "db not specified";
return "Database not specified";
case TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME:
return "Invalid identifier name : %s";
case TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR:
return "corresponding super table not in this db";
return "Corresponding super table not in this db";
case TSDB_CODE_PAR_INVALID_RANGE_OPTION:
return "invalid option %s: %"PRId64" valid range: [%d, %d]";
return "Invalid option %s: %"PRId64" valid range: [%d, %d]";
case TSDB_CODE_PAR_INVALID_STR_OPTION:
return "invalid option %s: %s";
return "Invalid option %s: %s";
case TSDB_CODE_PAR_INVALID_ENUM_OPTION:
return "invalid option %s: %"PRId64", only %d, %d allowed";
return "Invalid option %s: %"PRId64", only %d, %d allowed";
case TSDB_CODE_PAR_INVALID_TTL_OPTION:
return "invalid option ttl: %"PRId64", should be greater than or equal to %d";
return "Invalid option ttl: %"PRId64", should be greater than or equal to %d";
case TSDB_CODE_PAR_INVALID_KEEP_NUM:
return "invalid number of keep options";
return "Invalid number of keep options";
case TSDB_CODE_PAR_INVALID_KEEP_ORDER:
return "invalid keep value, should be keep0 <= keep1 <= keep2";
return "Invalid keep value, should be keep0 <= keep1 <= keep2";
case TSDB_CODE_PAR_INVALID_KEEP_VALUE:
return "invalid option keep: %d, %d, %d valid range: [%d, %d]";
return "Invalid option keep: %d, %d, %d valid range: [%d, %d]";
case TSDB_CODE_PAR_INVALID_COMMENT_OPTION:
return "invalid option comment, length cannot exceed %d";
return "Invalid option comment, length cannot exceed %d";
case TSDB_CODE_PAR_INVALID_F_RANGE_OPTION:
return "invalid option %s: %f valid range: [%d, %d]";
return "Invalid option %s: %f valid range: [%d, %d]";
case TSDB_CODE_PAR_INVALID_ROLLUP_OPTION:
return "invalid option rollup: only one function is allowed";
return "Invalid option rollup: only one function is allowed";
case TSDB_CODE_PAR_INVALID_RETENTIONS_OPTION:
return "invalid option retentions";
return "Invalid option retentions";
case TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST:
return "GROUP BY and WINDOW-clause can't be used together";
case TSDB_CODE_PAR_INVALID_OPTION_UNIT:
return "Invalid option %s unit: %c, only m, h, d allowed";
case TSDB_CODE_PAR_INVALID_KEEP_UNIT:
return "Invalid option keep unit: %c, %c, %c, only m, h, d allowed";
case TSDB_CODE_OUT_OF_MEMORY:
return "Out of memory";
default:
......
......@@ -226,6 +226,16 @@ TEST_F(ParserTest, selectExpression) {
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectCondition) {
setDatabase("root", "test");
bind("SELECT c1 FROM t1 where ts in (true, false)");
ASSERT_TRUE(run());
bind("SELECT * FROM t1 where c1 > 10 and c1 is not null");
ASSERT_TRUE(run());
}
TEST_F(ParserTest, selectPseudoColumn) {
setDatabase("root", "test");
......
......@@ -647,6 +647,159 @@ int32_t substrFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOu
return TSDB_CODE_SUCCESS;
}
int32_t castFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
if (inputNum!= 3) {
return TSDB_CODE_FAILED;
}
int16_t inputType = pInput[0].columnData->info.type;
int16_t outputType = *(int16_t *)pInput[1].columnData->pData;
if (outputType != TSDB_DATA_TYPE_BIGINT && outputType != TSDB_DATA_TYPE_UBIGINT &&
outputType != TSDB_DATA_TYPE_VARCHAR && outputType != TSDB_DATA_TYPE_NCHAR &&
outputType != TSDB_DATA_TYPE_TIMESTAMP) {
return TSDB_CODE_FAILED;
}
int64_t outputLen = *(int64_t *)pInput[2].columnData->pData;
char *input = NULL;
char *outputBuf = taosMemoryCalloc(outputLen * pInput[0].numOfRows, 1);
char *output = outputBuf;
if (IS_VAR_DATA_TYPE(inputType)) {
input = pInput[0].columnData->pData + pInput[0].columnData->varmeta.offset[0];
} else {
input = pInput[0].columnData->pData;
}
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) {
if (colDataIsNull_s(pInput[0].columnData, i)) {
colDataAppendNULL(pOutput->columnData, i);
continue;
}
switch(outputType) {
case TSDB_DATA_TYPE_BIGINT: {
if (inputType == TSDB_DATA_TYPE_BINARY) {
memcpy(output, varDataVal(input), varDataLen(input));
*(int64_t *)output = strtoll(output, NULL, 10);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
if (len < 0) {
taosMemoryFree(newBuf);
return TSDB_CODE_FAILED;
}
newBuf[len] = 0;
*(int64_t *)output = strtoll(newBuf, NULL, 10);
taosMemoryFree(newBuf);
} else {
GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
}
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
if (inputType == TSDB_DATA_TYPE_BINARY) {
memcpy(output, varDataVal(input), varDataLen(input));
*(uint64_t *)output = strtoull(output, NULL, 10);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
char *newBuf = taosMemoryCalloc(1, outputLen * TSDB_NCHAR_SIZE + 1);
int32_t len = taosUcs4ToMbs((TdUcs4 *)varDataVal(input), varDataLen(input), newBuf);
if (len < 0) {
taosMemoryFree(newBuf);
return TSDB_CODE_FAILED;
}
newBuf[len] = 0;
*(uint64_t *)output = strtoull(newBuf, NULL, 10);
taosMemoryFree(newBuf);
} else {
GET_TYPED_DATA(*(uint64_t *)output, uint64_t, inputType, input);
}
break;
}
case TSDB_DATA_TYPE_TIMESTAMP: {
if (inputType == TSDB_DATA_TYPE_BINARY || inputType == TSDB_DATA_TYPE_NCHAR) {
//not support
return TSDB_CODE_FAILED;
} else {
GET_TYPED_DATA(*(int64_t *)output, int64_t, inputType, input);
}
break;
}
case TSDB_DATA_TYPE_BINARY: {
if (inputType == TSDB_DATA_TYPE_BOOL) {
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), *(int8_t *)input ? "true" : "false");
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
int32_t len = sprintf(varDataVal(output), "%.*s", (int32_t)(outputLen - VARSTR_HEADER_SIZE), varDataVal(input));
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_BINARY || inputType == TSDB_DATA_TYPE_NCHAR) {
//not support
return TSDB_CODE_FAILED;
} else {
char tmp[400] = {0};
NUM_TO_STRING(inputType, input, sizeof(tmp), tmp);
int32_t len = (int32_t)strlen(tmp);
len = (outputLen - VARSTR_HEADER_SIZE) > len ? len : (outputLen - VARSTR_HEADER_SIZE);
memcpy(varDataVal(output), tmp, len);
varDataSetLen(output, len);
}
break;
}
case TSDB_DATA_TYPE_NCHAR: {
int32_t outputCharLen = (outputLen - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
if (inputType == TSDB_DATA_TYPE_BOOL) {
char tmp[8] = {0};
int32_t len = sprintf(tmp, "%.*s", outputCharLen, *(int8_t *)input ? "true" : "false" );
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
return TSDB_CODE_FAILED;
}
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_BINARY) {
int32_t len = outputCharLen > varDataLen(input) ? varDataLen(input) : outputCharLen;
bool ret = taosMbsToUcs4(input + VARSTR_HEADER_SIZE, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
return TSDB_CODE_FAILED;
}
varDataSetLen(output, len);
} else if (inputType == TSDB_DATA_TYPE_NCHAR) {
int32_t len = MIN(outputLen, varDataLen(input) + VARSTR_HEADER_SIZE);
memcpy(output, input, len);
varDataSetLen(output, len - VARSTR_HEADER_SIZE);
} else {
char tmp[400] = {0};
NUM_TO_STRING(inputType, input, sizeof(tmp), tmp);
int32_t len = (int32_t)strlen(tmp);
len = outputCharLen > len ? len : outputCharLen;
bool ret = taosMbsToUcs4(tmp, len, (TdUcs4 *)varDataVal(output), outputLen - VARSTR_HEADER_SIZE, &len);
if (!ret) {
return TSDB_CODE_FAILED;
}
varDataSetLen(output, len);
}
break;
}
default: {
return TSDB_CODE_FAILED;
}
}
colDataAppend(pOutput->columnData, i, output, false);
if (IS_VAR_DATA_TYPE(inputType)) {
input += varDataTLen(input);
} else {
input += tDataTypes[inputType].bytes;
}
if (IS_VAR_DATA_TYPE(outputType)) {
output += varDataTLen(output);
} else {
output += tDataTypes[outputType].bytes;
}
}
pOutput->numOfRows = pInput->numOfRows;
taosMemoryFree(outputBuf);
return TSDB_CODE_SUCCESS;
}
int32_t atanFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
return doScalarFunctionUnique(pInput, inputNum, pOutput, atan);
......
......@@ -164,8 +164,8 @@ int32_t taosSendHttpReport(const char* server, uint16_t port, char* pCont, int32
wb[1] = uv_buf_init((char*)pCont, contLen);
connect->data = wb;
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
terrno = 0;
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
uv_run(loop, UV_RUN_DEFAULT);
uv_loop_close(loop);
taosMemoryFree(connect);
......
......@@ -24,7 +24,7 @@ int32_t taosNewProc(char **args) {
if (pid == 0) {
args[0] = tsProcPath;
// close(STDIN_FILENO);
close(STDOUT_FILENO);
// close(STDOUT_FILENO);
// close(STDERR_FILENO);
return execvp(tsProcPath, args);
} else {
......
......@@ -369,53 +369,33 @@ int32_t taosGetCpuCores(float *numOfCores) {
#endif
}
int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
*cpu_system = 0;
*cpu_engine = 0;
return 0;
#elif defined(_TD_DARWIN_64)
void taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
static int64_t lastSysUsed = 0;
static int64_t lastSysTotal = 0;
static int64_t lastProcTotal = 0;
static int64_t curSysUsed = 0;
static int64_t curSysTotal = 0;
static int64_t curProcTotal = 0;
*cpu_system = 0;
*cpu_engine = 0;
return 0;
#else
static uint64_t lastSysUsed = 0;
static uint64_t lastSysTotal = 0;
static uint64_t lastProcTotal = 0;
SysCpuInfo sysCpu;
ProcCpuInfo procCpu;
if (taosGetSysCpuInfo(&sysCpu) != 0) {
return -1;
}
if (taosGetProcCpuInfo(&procCpu) != 0) {
return -1;
}
uint64_t curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system;
uint64_t curSysTotal = curSysUsed + sysCpu.idle;
uint64_t curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
SysCpuInfo sysCpu = {0};
ProcCpuInfo procCpu = {0};
if (taosGetSysCpuInfo(&sysCpu) == 0 && taosGetProcCpuInfo(&procCpu) == 0) {
curSysUsed = sysCpu.user + sysCpu.nice + sysCpu.system;
curSysTotal = curSysUsed + sysCpu.idle;
curProcTotal = procCpu.utime + procCpu.stime + procCpu.cutime + procCpu.cstime;
if (lastSysUsed == 0 || lastSysTotal == 0 || lastProcTotal == 0) {
lastSysUsed = curSysUsed > 1 ? curSysUsed : 1;
lastSysTotal = curSysTotal > 1 ? curSysTotal : 1;
lastProcTotal = curProcTotal > 1 ? curProcTotal : 1;
return -1;
}
if (curSysTotal > lastSysTotal && curSysUsed >= lastSysUsed && curProcTotal >= lastProcTotal) {
*cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
*cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100;
}
if (curSysTotal == lastSysTotal) {
return -1;
lastSysUsed = curSysUsed;
lastSysTotal = curSysTotal;
lastProcTotal = curProcTotal;
}
*cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
*cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100;
lastSysUsed = curSysUsed;
lastSysTotal = curSysTotal;
lastProcTotal = curProcTotal;
return 0;
#endif
}
int32_t taosGetTotalMemory(int64_t *totalKB) {
......@@ -618,7 +598,6 @@ void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, i
static int64_t last_wchars = 0;
static int64_t last_read_bytes = 0;
static int64_t last_write_bytes = 0;
static int64_t cur_rchars = 0;
static int64_t cur_wchars = 0;
static int64_t cur_read_bytes = 0;
......@@ -632,6 +611,11 @@ void taosGetProcIODelta(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, i
last_wchars = cur_wchars;
last_read_bytes = cur_read_bytes;
last_write_bytes = cur_write_bytes;
} else {
*rchars = 0;
*wchars = 0;
*read_bytes = 0;
*write_bytes = 0;
}
}
......@@ -693,7 +677,6 @@ int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes) {
static int64_t last_receive_bytes = 0;
static int64_t last_transmit_bytes = 0;
static int64_t cur_receive_bytes = 0;
static int64_t cur_transmit_bytes = 0;
if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) == 0) {
......@@ -701,6 +684,9 @@ void taosGetCardInfoDelta(int64_t *receive_bytes, int64_t *transmit_bytes) {
*transmit_bytes = cur_transmit_bytes - last_transmit_bytes;
last_receive_bytes = cur_receive_bytes;
last_transmit_bytes = cur_transmit_bytes;
} else {
*receive_bytes = 0;
*transmit_bytes = 0;
}
}
......
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c monitorfqdn -v localhost
system sh/cfg.sh -n dnode1 -c monitorport -v 80
system sh/cfg.sh -n dnode1 -c monitorInterval -v 1
system sh/cfg.sh -n dnode1 -c monitorComp -v 1
#system sh/cfg.sh -n dnode1 -c supportVnodes -v 128
system sh/exec.sh -n dnode1 -s start
sql connect
print =============== show dnodes
sleep 2000
sql create database db vgroups 2;
sleep 2000
print =============== create drop qnode 1
sql create qnode on dnode 1
sql create snode on dnode 1
sql create bnode on dnode 1
return
print =============== restart
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
return
system sh/deploy.sh -n dnode2 -i 2
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s start
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready
endi
sql connect
print =============== create database
sql create database db
sql show databases
if $rows != 2 then
return -1
endi
sql use db
print =============== create super table and child table
sql create table stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int)
sql show stables
print $rows $data00 $data01 $data02
if $rows != 1 then
return -1
endi
sql create table ct1 using stb1 tags ( 1 )
sql create table ct2 using stb1 tags ( 2 )
sql create table ct3 using stb1 tags ( 3 )
sql create table ct4 using stb1 tags ( 4 )
sql show tables
print $rows $data00 $data10 $data20
if $rows != 4 then
return -1
endi
print =============== insert data into child table ct1 (s)
sql insert into ct1 values ( '2022-01-01 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now+1a )
sql insert into ct1 values ( '2022-01-01 01:01:06.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now+2a )
sql insert into ct1 values ( '2022-01-01 01:01:10.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now+3a )
sql insert into ct1 values ( '2022-01-01 01:01:16.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now+4a )
sql insert into ct1 values ( '2022-01-01 01:01:20.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now+5a )
sql insert into ct1 values ( '2022-01-01 01:01:26.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now+6a )
sql insert into ct1 values ( '2022-01-01 01:01:30.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", now+7a )
sql insert into ct1 values ( '2022-01-01 01:01:36.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", now+8a )
print =============== insert data into child table ct4 (y)
sql insert into ct4 values ( '2019-01-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
sql insert into ct4 values ( '2019-10-21 01:01:01.000', 1, 11111, 111, 11, 1.11, 11.11, 1, "binary1", "nchar1", now+1a )
sql insert into ct4 values ( '2019-12-31 01:01:01.000', 2, 22222, 222, 22, 2.22, 22.22, 0, "binary2", "nchar2", now+2a )
sql insert into ct4 values ( '2020-01-01 01:01:06.000', 3, 33333, 333, 33, 3.33, 33.33, 0, "binary3", "nchar3", now+3a )
sql insert into ct4 values ( '2020-05-07 01:01:10.000', 4, 44444, 444, 44, 4.44, 44.44, 1, "binary4", "nchar4", now+4a )
sql insert into ct4 values ( '2020-09-30 01:01:16.000', 5, 55555, 555, 55, 5.55, 55.55, 0, "binary5", "nchar5", now+5a )
sql insert into ct4 values ( '2020-12-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
sql insert into ct4 values ( '2021-02-01 01:01:20.000', 6, 66666, 666, 66, 6.66, 66.66, 1, "binary6", "nchar6", now+6a )
sql insert into ct4 values ( '2021-10-28 01:01:26.000', 7, 00000, 000, 00, 0.00, 00.00, 1, "binary7", "nchar7", "1970-01-01 08:00:00.000" )
sql insert into ct4 values ( '2021-12-01 01:01:30.000', 8, -88888, -888, -88, -8.88, -88.88, 0, "binary8", "nchar8", "1969-01-01 01:00:00.000" )
sql insert into ct4 values ( '2022-02-31 01:01:36.000', 9, -99999999999999999, -999, -99, -9.99, -999999999999999999999.99, 1, "binary9", "nchar9", "1900-01-01 00:00:00.000" )
sql insert into ct4 values ( '2022-05-21 01:01:01.000', NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL )
print ================ start query ======================
print ================ query 1 having condition
sql_error select c1 from ct1 group by c1 having count(c1)
sql_error select c1 from ct4 group by c1 having count(c1)
sql_error select count(c1) from ct1 group by c1 having count(c1)
sql select sum(c1) ,count(c7) from ct4 group by c7 having count(c7) > 1 ;
print ====> sql : select sum(c1) ,count(c7) from ct4 group by c7 having count(c7) > 1 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
sql select sum(c1) ,count(c7) from ct4 group by c7 having count(c1) < sum(c1) ;
print ====> sql : select sum(c1) ,count(c7) from ct4 group by c7 having count(c7) > 1 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
sql select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 2 and sum(c1) > 2 ;
print ====> sql : select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 2 and sum(c1) > 2 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
sql select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
print ====> sql : select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
print ================ query 1 complex with having condition
sql select count(c1) from ct4 where c1 > 2 group by c7 having count(c1) < 1 limit 1 offset 1
print ====> sql : select count(c1) from ct4 where c1 > 2 group by c7 having count(c1) < 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select abs(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select abs(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select acos(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select acos(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select asin(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select asin(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select atan(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select atan(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select ceil(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select ceil(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select cos(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select cos(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select floor(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select floor(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select log(c1,10) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select log(c1,10) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select pow(c1,3) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select pow(c1,3) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select round(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select round(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select sqrt(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select sqrt(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select sin(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select sin(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select tan(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select tan(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
print =================== count all rows
sql select count(c1) from stb1
print ====> sql : select count(c1) from stb1
print ====> rows: $data00
if $data00 != 20 then
return -1
endi
#=================================================
print =============== stop and restart taosd
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode1 -s start
$loop_cnt = 0
check_dnode_ready_0:
$loop_cnt = $loop_cnt + 1
sleep 200
if $loop_cnt == 10 then
print ====> dnode not ready!
return -1
endi
sql show dnodes
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05
if $data00 != 1 then
return -1
endi
if $data04 != ready then
goto check_dnode_ready_0
endi
print =================== count all rows
sql select count(c1) from stb1
print ====> sql : select count(c1) from stb1
print ====> rows: $data00
if $data00 != 20 then
return -1
endi
print ================ query 1 having condition
sql_error select c1 from ct1 group by c1 having count(c1)
sql_error select c1 from ct4 group by c1 having count(c1)
sql_error select count(c1) from ct1 group by c1 having count(c1)
sql select sum(c1) ,count(c7) from ct4 group by c7 having count(c7) > 1 ;
print ====> sql : select sum(c1) ,count(c7) from ct4 group by c7 having count(c7) > 1 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
sql select sum(c1) ,count(c7) from ct4 group by c7 having count(c1) < sum(c1) ;
print ====> sql : select sum(c1) ,count(c7) from ct4 group by c7 having count(c7) > 1 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
sql select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 2 and sum(c1) > 2 ;
print ====> sql : select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 2 and sum(c1) > 2 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
sql select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
print ====> sql : select sum(c1) ,count(c1) from ct4 group by c1 having count(c7) < 1 or sum(c1) > 2 ;
print ====> rows: $rows
if $rows != 2 then
return -1
endi
print ================ query 1 complex with having condition
sql select count(c1) from ct4 where c1 > 2 group by c7 having count(c1) < 1 limit 1 offset 1
print ====> sql : select count(c1) from ct4 where c1 > 2 group by c7 having count(c1) < 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select abs(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select abs(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select acos(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select acos(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select asin(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select asin(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select atan(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select atan(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select ceil(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select ceil(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select cos(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select cos(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select floor(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select floor(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select log(c1,10) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select log(c1,10) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select pow(c1,3) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select pow(c1,3) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select round(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select round(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select sqrt(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select sqrt(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select sin(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select sin(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
sql select tan(c1) from ct4 where c1 > 2 group by c1 having abs(c1) > 1 limit 1 offset 1
print ====> sql : select tan(c1) from ct4 where c1 > 2 group by c7 having abs(c1) > 1 limit 1 offset 1
print ====> rows: $rows
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
......@@ -4,13 +4,18 @@ sql connect
print ================ insert data
$dbNamme = d0
$tbPrefix = ct
$tbNum = 2
$tbNum = 10
$rowNum = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
sql use $dbNamme
$loop_cnt = 0
loop_insert:
print ====> loop $loop_cnt insert
$loop_cnt = $loop_cnt + 1
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
......@@ -32,9 +37,11 @@ while $i < $tbNum
$tstart = $tstart + 1
$x = $x + 1
endw
#print ====> insert rows: $rowNum into $tb and ntb
$i = $i + 1
$tstart = 1640966400000
# $tstart = 1640966400000
endw
goto loop_insert
......
......@@ -4,13 +4,18 @@ sql connect
print ================ insert data
$dbNamme = d1
$tbPrefix = ct
$tbNum = 2
$tbNum = 10
$rowNum = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
sql use $dbNamme
$loop_cnt = 0
loop_insert:
print ====> loop $loop_cnt insert
$loop_cnt = $loop_cnt + 1
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
......@@ -32,9 +37,11 @@ while $i < $tbNum
$tstart = $tstart + 1
$x = $x + 1
endw
#print ====> insert rows: $rowNum into $tb and ntb
$i = $i + 1
$tstart = 1640966400000
# $tstart = 1640966400000
endw
goto loop_insert
......
此差异已折叠。
此差异已折叠。
......@@ -26,6 +26,7 @@
#include "tglobal.h"
#include "ttypes.h"
#include "tutil.h"
#include "tconfig.h"
#include <regex.h>
#include <wordexp.h>
......@@ -90,6 +91,11 @@ TAOS *shellInit(SShellArguments *_args) {
_args->user = TSDB_DEFAULT_USER;
}
SConfig *pCfg = cfgInit();
if (NULL == pCfg) return NULL;
if (0 != taosAddClientLogCfg(pCfg)) return NULL;
// Connect to the database.
TAOS *con = NULL;
if (_args->auth == NULL) {
......
Subproject commit 33cdfe4f90a209f105c1b6091439798a9cde1e93
Subproject commit bf6c766986c61ff4fc80421fdea682a8fd4b5b32
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册