提交 56ea47e7 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-14481-3.0

...@@ -88,12 +88,6 @@ def pre_test(){ ...@@ -88,12 +88,6 @@ def pre_test(){
cmake .. > /dev/null cmake .. > /dev/null
make -j4> /dev/null make -j4> /dev/null
''' '''
sh'''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
'''
return 1 return 1
} }
...@@ -103,7 +97,6 @@ pipeline { ...@@ -103,7 +97,6 @@ pipeline {
environment{ environment{
WK = '/var/lib/jenkins/workspace/TDinternal' WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine' WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
} }
stages { stages {
stage('pre_build'){ stage('pre_build'){
...@@ -124,11 +117,6 @@ pipeline { ...@@ -124,11 +117,6 @@ pipeline {
./test-all.sh b1fq ./test-all.sh b1fq
''' '''
sh''' sh'''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
sh'''
cd ${WKC}/debug cd ${WKC}/debug
ctest ctest
''' '''
......
...@@ -984,7 +984,6 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); ...@@ -984,7 +984,6 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
void tFreeSShowRsp(SShowRsp* pRsp); void tFreeSShowRsp(SShowRsp* pRsp);
typedef struct { typedef struct {
int32_t type;
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
char tb[TSDB_TABLE_NAME_LEN]; char tb[TSDB_TABLE_NAME_LEN];
int64_t showId; int64_t showId;
......
...@@ -166,6 +166,7 @@ typedef struct SInputColumnInfoData { ...@@ -166,6 +166,7 @@ typedef struct SInputColumnInfoData {
SColumnInfoData *pPTS; // primary timestamp column SColumnInfoData *pPTS; // primary timestamp column
SColumnInfoData **pData; SColumnInfoData **pData;
SColumnDataAgg **pColumnDataAgg; SColumnDataAgg **pColumnDataAgg;
uint64_t uid; // table uid
} SInputColumnInfoData; } SInputColumnInfoData;
// sql function runtime context // sql function runtime context
...@@ -191,7 +192,7 @@ typedef struct SqlFunctionCtx { ...@@ -191,7 +192,7 @@ typedef struct SqlFunctionCtx {
int16_t functionId; // function id int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data char * pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams; int32_t numOfParams;
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset; int32_t offset;
......
...@@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size ...@@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
* @return * @return
*/ */
void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar,
__ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot); __ext_compar_fn_t compar, char* buf, bool maxroot);
/** /**
* sort heap to make sure it is a max/min root heap * sort heap to make sure it is a max/min root heap
...@@ -98,7 +98,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const ...@@ -98,7 +98,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
* @return * @return
*/ */
void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar,
const void *parswap, __ext_swap_fn_t swap, bool maxroot); bool maxroot);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -219,6 +219,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo); ...@@ -219,6 +219,7 @@ void doSetOneRowPtr(SReqResultInfo* pResultInfo);
void setResPrecision(SReqResultInfo* pResInfo, int32_t precision); void setResPrecision(SReqResultInfo* pResInfo, int32_t precision);
int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4); int32_t setQueryResultFromRsp(SReqResultInfo* pResultInfo, const SRetrieveTableRsp* pRsp, bool convertUcs4);
void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols); void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t numOfCols);
void doFreeReqResultInfo(SReqResultInfo* pResInfo);
static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) { static FORCE_INLINE SReqResultInfo* tmqGetCurResInfo(TAOS_RES* res) {
SMqRspObj* msg = (SMqRspObj*)res; SMqRspObj* msg = (SMqRspObj*)res;
......
...@@ -30,15 +30,15 @@ ...@@ -30,15 +30,15 @@
#define TSC_VAR_RELEASED 0 #define TSC_VAR_RELEASED 0
SAppInfo appInfo; SAppInfo appInfo;
int32_t clientReqRefPool = -1; int32_t clientReqRefPool = -1;
int32_t clientConnRefPool = -1; int32_t clientConnRefPool = -1;
static TdThreadOnce tscinit = PTHREAD_ONCE_INIT; static TdThreadOnce tscinit = PTHREAD_ONCE_INIT;
volatile int32_t tscInitRes = 0; volatile int32_t tscInitRes = 0;
static void registerRequest(SRequestObj *pRequest) { static void registerRequest(SRequestObj *pRequest) {
STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id); STscObj *pTscObj = acquireTscObj(pRequest->pTscObj->id);
assert(pTscObj != NULL); assert(pTscObj != NULL);
// connection has been released already, abort creating request. // connection has been released already, abort creating request.
...@@ -49,8 +49,8 @@ static void registerRequest(SRequestObj *pRequest) { ...@@ -49,8 +49,8 @@ static void registerRequest(SRequestObj *pRequest) {
if (pTscObj->pAppInfo) { if (pTscObj->pAppInfo) {
SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary; SInstanceSummary *pSummary = &pTscObj->pAppInfo->summary;
int32_t total = atomic_add_fetch_64((int64_t*)&pSummary->totalRequests, 1); int32_t total = atomic_add_fetch_64((int64_t *)&pSummary->totalRequests, 1);
int32_t currentInst = atomic_add_fetch_64((int64_t*)&pSummary->currentRequests, 1); int32_t currentInst = atomic_add_fetch_64((int64_t *)&pSummary->currentRequests, 1);
tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64 tscDebug("0x%" PRIx64 " new Request from connObj:0x%" PRIx64
", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64, ", current:%d, app current:%d, total:%d, reqId:0x%" PRIx64,
pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId); pRequest->self, pRequest->pTscObj->id, num, currentInst, total, pRequest->requestId);
...@@ -60,16 +60,16 @@ static void registerRequest(SRequestObj *pRequest) { ...@@ -60,16 +60,16 @@ static void registerRequest(SRequestObj *pRequest) {
static void deregisterRequest(SRequestObj *pRequest) { static void deregisterRequest(SRequestObj *pRequest) {
assert(pRequest != NULL); assert(pRequest != NULL);
STscObj * pTscObj = pRequest->pTscObj; STscObj *pTscObj = pRequest->pTscObj;
SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary; SInstanceSummary *pActivity = &pTscObj->pAppInfo->summary;
int32_t currentInst = atomic_sub_fetch_64((int64_t*)&pActivity->currentRequests, 1); int32_t currentInst = atomic_sub_fetch_64((int64_t *)&pActivity->currentRequests, 1);
int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1); int32_t num = atomic_sub_fetch_32(&pTscObj->numOfReqs, 1);
int64_t duration = taosGetTimestampUs() - pRequest->metric.start; int64_t duration = taosGetTimestampUs() - pRequest->metric.start;
tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64 tscDebug("0x%" PRIx64 " free Request from connObj: 0x%" PRIx64 ", reqId:0x%" PRIx64 " elapsed:%" PRIu64
" ms, current:%d, app current:%d", " ms, current:%d, app current:%d",
pRequest->self, pTscObj->id, pRequest->requestId, duration/1000, num, currentInst); pRequest->self, pTscObj->id, pRequest->requestId, duration / 1000, num, currentInst);
releaseTscObj(pTscObj->id); releaseTscObj(pTscObj->id);
} }
...@@ -109,12 +109,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) { ...@@ -109,12 +109,12 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
} }
void closeAllRequests(SHashObj *pRequests) { void closeAllRequests(SHashObj *pRequests) {
void *pIter = taosHashIterate(pRequests, NULL); void *pIter = taosHashIterate(pRequests, NULL);
while (pIter != NULL) { while (pIter != NULL) {
int64_t *rid = pIter; int64_t *rid = pIter;
releaseRequest(*rid); releaseRequest(*rid);
pIter = taosHashIterate(pRequests, pIter); pIter = taosHashIterate(pRequests, pIter);
} }
} }
...@@ -144,7 +144,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI ...@@ -144,7 +144,7 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
return NULL; return NULL;
} }
pObj->pAppInfo = pAppInfo; pObj->pAppInfo = pAppInfo;
tstrncpy(pObj->user, user, sizeof(pObj->user)); tstrncpy(pObj->user, user, sizeof(pObj->user));
memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN); memcpy(pObj->pass, auth, TSDB_PASSWORD_LEN);
...@@ -160,13 +160,9 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI ...@@ -160,13 +160,9 @@ void *createTscObj(const char *user, const char *auth, const char *db, SAppInstI
return pObj; return pObj;
} }
STscObj *acquireTscObj(int64_t rid) { STscObj *acquireTscObj(int64_t rid) { return (STscObj *)taosAcquireRef(clientConnRefPool, rid); }
return (STscObj *)taosAcquireRef(clientConnRefPool, rid);
}
int32_t releaseTscObj(int64_t rid) { int32_t releaseTscObj(int64_t rid) { return taosReleaseRef(clientConnRefPool, rid); }
return taosReleaseRef(clientConnRefPool, rid);
}
void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) { void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t type) {
assert(pObj != NULL); assert(pObj != NULL);
...@@ -190,11 +186,11 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty ...@@ -190,11 +186,11 @@ void *createRequest(STscObj *pObj, __taos_async_fn_t fp, void *param, int32_t ty
tsem_init(&pRequest->body.rspSem, 0, 0); tsem_init(&pRequest->body.rspSem, 0, 0);
registerRequest(pRequest); registerRequest(pRequest);
return pRequest; return pRequest;
} }
static void doFreeReqResultInfo(SReqResultInfo *pResInfo) { void doFreeReqResultInfo(SReqResultInfo *pResInfo) {
taosMemoryFreeClear(pResInfo->pRspMsg); taosMemoryFreeClear(pResInfo->pRspMsg);
taosMemoryFreeClear(pResInfo->length); taosMemoryFreeClear(pResInfo->length);
taosMemoryFreeClear(pResInfo->row); taosMemoryFreeClear(pResInfo->row);
...@@ -217,7 +213,7 @@ static void doDestroyRequest(void *p) { ...@@ -217,7 +213,7 @@ static void doDestroyRequest(void *p) {
assert(RID_VALID(pRequest->self)); assert(RID_VALID(pRequest->self));
taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self)); taosHashRemove(pRequest->pTscObj->pRequests, &pRequest->self, sizeof(pRequest->self));
taosMemoryFreeClear(pRequest->msgBuf); taosMemoryFreeClear(pRequest->msgBuf);
taosMemoryFreeClear(pRequest->sqlstr); taosMemoryFreeClear(pRequest->sqlstr);
taosMemoryFreeClear(pRequest->pDb); taosMemoryFreeClear(pRequest->pDb);
...@@ -244,14 +240,9 @@ void destroyRequest(SRequestObj *pRequest) { ...@@ -244,14 +240,9 @@ void destroyRequest(SRequestObj *pRequest) {
taosRemoveRef(clientReqRefPool, pRequest->self); taosRemoveRef(clientReqRefPool, pRequest->self);
} }
SRequestObj *acquireRequest(int64_t rid) { SRequestObj *acquireRequest(int64_t rid) { return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid); }
return (SRequestObj *)taosAcquireRef(clientReqRefPool, rid);
}
int32_t releaseRequest(int64_t rid) {
return taosReleaseRef(clientReqRefPool, rid);
}
int32_t releaseRequest(int64_t rid) { return taosReleaseRef(clientReqRefPool, rid); }
void taos_init_imp(void) { void taos_init_imp(void) {
// In the APIs of other program language, taos_cleanup is not available yet. // In the APIs of other program language, taos_cleanup is not available yet.
...@@ -380,7 +371,7 @@ uint64_t generateRequestId() { ...@@ -380,7 +371,7 @@ uint64_t generateRequestId() {
} }
uint64_t id = 0; uint64_t id = 0;
while (true) { while (true) {
int64_t ts = taosGetTimestampMs(); int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId(); uint64_t pid = taosGetPId();
......
...@@ -135,6 +135,16 @@ void taos_free_result(TAOS_RES *res) { ...@@ -135,6 +135,16 @@ void taos_free_result(TAOS_RES *res) {
if (TD_RES_QUERY(res)) { if (TD_RES_QUERY(res)) {
SRequestObj *pRequest = (SRequestObj *)res; SRequestObj *pRequest = (SRequestObj *)res;
destroyRequest(pRequest); destroyRequest(pRequest);
} else if (TD_RES_TMQ(res)) {
SMqRspObj *pRsp = (SMqRspObj *)res;
if (pRsp->rsp.blockData) taosArrayDestroyP(pRsp->rsp.blockData, taosMemoryFree);
if (pRsp->rsp.blockDataLen) taosArrayDestroy(pRsp->rsp.blockDataLen);
if (pRsp->rsp.blockSchema) taosArrayDestroy(pRsp->rsp.blockSchema);
if (pRsp->rsp.blockTbName) taosArrayDestroy(pRsp->rsp.blockTbName);
if (pRsp->rsp.blockTags) taosArrayDestroy(pRsp->rsp.blockTags);
if (pRsp->rsp.blockTagSchema) taosArrayDestroy(pRsp->rsp.blockTagSchema);
pRsp->resInfo.pRspMsg = NULL;
doFreeReqResultInfo(&pRsp->resInfo);
} }
} }
......
...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) { ...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*) from tu"); pRes = taos_query(pConn, "select now() from m1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -2441,7 +2441,6 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq ...@@ -2441,7 +2441,6 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1; if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->type) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1; if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2457,7 +2456,6 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR ...@@ -2457,7 +2456,6 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->type) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -89,7 +89,6 @@ int32_t Testbase::SendShowReq(int8_t showType, const char *tb, const char* db) { ...@@ -89,7 +89,6 @@ int32_t Testbase::SendShowReq(int8_t showType, const char *tb, const char* db) {
} }
SRetrieveTableReq retrieveReq = {0}; SRetrieveTableReq retrieveReq = {0};
retrieveReq.type = showType;
strcpy(retrieveReq.db, db); strcpy(retrieveReq.db, db);
strcpy(retrieveReq.tb, tb); strcpy(retrieveReq.tb, tb);
......
...@@ -391,7 +391,6 @@ typedef struct { ...@@ -391,7 +391,6 @@ typedef struct {
int16_t numOfColumns; int16_t numOfColumns;
int32_t rowSize; int32_t rowSize;
int32_t numOfRows; int32_t numOfRows;
int32_t payloadLen;
void* pIter; void* pIter;
SMnode* pMnode; SMnode* pMnode;
STableMetaRsp* pMeta; STableMetaRsp* pMeta;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#define SHOW_STEP_SIZE 100 #define SHOW_STEP_SIZE 100
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq); static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);
static void mndFreeShowObj(SShowObj *pShow); static void mndFreeShowObj(SShowObj *pShow);
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
...@@ -47,18 +47,80 @@ void mndCleanupShow(SMnode *pMnode) { ...@@ -47,18 +47,80 @@ void mndCleanupShow(SMnode *pMnode) {
} }
} }
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq) { static int32_t convertToRetrieveType(char* name, int32_t len) {
int32_t type = -1;
if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) {
type = TSDB_MGMT_TABLE_DNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, len) == 0) {
type = TSDB_MGMT_TABLE_MNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, len) == 0) {
type = TSDB_MGMT_TABLE_MODULE;
} else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, len) == 0) {
type = TSDB_MGMT_TABLE_QNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, len) == 0) {
type = TSDB_MGMT_TABLE_BNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) {
type = TSDB_MGMT_TABLE_SNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) {
type = TSDB_MGMT_TABLE_CLUSTER;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, len) == 0) {
type = TSDB_MGMT_TABLE_DB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) {
type = TSDB_MGMT_TABLE_FUNC;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) {
// type = TSDB_MGMT_TABLE_INDEX;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
type = TSDB_MGMT_TABLE_STB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
type = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
// type = TSDB_MGMT_TABLE_DIST;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) {
type = TSDB_MGMT_TABLE_USER;
} else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, len) == 0) {
type = TSDB_MGMT_TABLE_GRANTS;
} else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, len) == 0) {
type = TSDB_MGMT_TABLE_VGROUP;
} else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, len) == 0) {
type = TSDB_MGMT_TABLE_TOPICS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, len) == 0) {
type = TSDB_MGMT_TABLE_CONSUMERS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, len) == 0) {
type = TSDB_MGMT_TABLE_SUBSCRIBES;
} else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, len) == 0) {
type = TSDB_MGMT_TABLE_TRANS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, len) == 0) {
type = TSDB_MGMT_TABLE_SMAS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, len) == 0) {
type = TSDB_MGMT_TABLE_CONFIGS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, len) == 0) {
type = TSDB_MGMT_TABLE_CONNS;
} else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) {
type = TSDB_MGMT_TABLE_QUERIES;
} else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) {
type = TSDB_MGMT_TABLE_VNODES;
} else {
// ASSERT(0);
}
return type;
}
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1); int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1); if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
int32_t size = sizeof(SShowObj) + pReq->payloadLen; int32_t size = sizeof(SShowObj);
SShowObj showObj = {0}; SShowObj showObj = {0};
showObj.id = showId; showObj.id = showId;
showObj.pMnode = pMnode; showObj.pMnode = pMnode;
showObj.type = pReq->type; showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
showObj.payloadLen = pReq->payloadLen;
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000; int32_t keepTime = tsShellActivityTimer * 6 * 1000;
...@@ -127,10 +189,6 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { ...@@ -127,10 +189,6 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
} }
if (retrieveReq.showId == 0) { if (retrieveReq.showId == 0) {
SShowReq req = {0};
req.type = retrieveReq.type;
strncpy(req.db, retrieveReq.db, tListLen(req.db));
STableMetaRsp *pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb) + 1); STableMetaRsp *pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb) + 1);
if (pMeta == NULL) { if (pMeta == NULL) {
terrno = TSDB_CODE_MND_INVALID_INFOS_TBL; terrno = TSDB_CODE_MND_INVALID_INFOS_TBL;
...@@ -138,7 +196,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { ...@@ -138,7 +196,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
return -1; return -1;
} }
pShow = mndCreateShowObj(pMnode, &req); pShow = mndCreateShowObj(pMnode, &retrieveReq);
if (pShow == NULL) { if (pShow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta req since %s", terrstr()); mError("failed to process show-meta req since %s", terrstr());
......
...@@ -36,7 +36,7 @@ TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) { ...@@ -36,7 +36,7 @@ TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); ASSERT_NE(pRsp->code, 0);
} }
TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
...@@ -50,7 +50,7 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { ...@@ -50,7 +50,7 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); ASSERT_NE(pRsp->code, 0);
} }
TEST_F(MndTestShow, 03_ShowMsg_Conn) { TEST_F(MndTestShow, 03_ShowMsg_Conn) {
......
...@@ -382,7 +382,7 @@ typedef struct SSysTableScanInfo { ...@@ -382,7 +382,7 @@ typedef struct SSysTableScanInfo {
void* pCur; // cursor for iterate the local table meta store. void* pCur; // cursor for iterate the local table meta store.
SArray* scanCols; // SArray<int16_t> scan column id list SArray* scanCols; // SArray<int16_t> scan column id list
int32_t type; // show type, TODO remove it // int32_t type; // show type, TODO remove it
SName name; SName name;
SSDataBlock* pRes; SSDataBlock* pRes;
int32_t capacity; int32_t capacity;
...@@ -479,7 +479,7 @@ typedef struct { ...@@ -479,7 +479,7 @@ typedef struct {
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition; SNode* pCondition;
bool isInit; // denote if current val is initialized or not bool isInit; // denote if current val is initialized or not
...@@ -600,7 +600,6 @@ typedef struct SJoinOperatorInfo { ...@@ -600,7 +600,6 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos; int32_t rightPos;
SColumnInfo rightCol; SColumnInfo rightCol;
SNode *pOnCondition; SNode *pOnCondition;
// SRspResultInfo resultInfo;
} SJoinOperatorInfo; } SJoinOperatorInfo;
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
......
...@@ -1087,6 +1087,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -1087,6 +1087,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pCtx[i].currentStage = MAIN_SCAN; pCtx[i].currentStage = MAIN_SCAN;
SInputColumnInfoData* pInput = &pCtx[i].input; SInputColumnInfoData* pInput = &pCtx[i].input;
pInput->uid = pBlock->info.uid;
SExprInfo* pOneExpr = &pOperator->pExpr[i]; SExprInfo* pOneExpr = &pOperator->pExpr[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
...@@ -1101,7 +1102,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -1101,7 +1102,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column
ASSERT(pInput->pData[j] != NULL); ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
if (createDummyCol) { // todo avoid case: top(k, 12), 12 is the value parameter.
// sum(11), 11 is also the value parameter.
if (createDummyCol && pOneExpr->base.numOfParams == 1) {
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -1876,67 +1879,58 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -1876,67 +1879,58 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
} }
} }
pCtx->resDataInfo.interBufSize = env.calcMemSize; pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; // for simple column, the intermediate buffer needs to hold one element. pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
// for simple column, the intermediate buffer needs to hold one element.
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
} }
pCtx->input.numOfInputCols = pFunct->numOfParams; pCtx->input.numOfInputCols = pFunct->numOfParams;
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->pTsOutput = NULL; pCtx->pTsOutput = NULL;
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
pCtx->resDataInfo.type = pFunct->resSchema.type; pCtx->resDataInfo.type = pFunct->resSchema.type;
pCtx->order = TSDB_ORDER_ASC; pCtx->order = TSDB_ORDER_ASC;
pCtx->start.key = INT64_MIN; pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN; pCtx->end.key = INT64_MIN;
#if 0 pCtx->numOfParams = pExpr->base.numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// int16_t type = pFunct->param[j].nType; pCtx->param = pFunct->pParam;
// int16_t bytes = pFunct->param[j].nLen; // for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// // set the order information for top/bottom query
// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { // int32_t functionId = pCtx->functionId;
// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type); // if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// } else { // int32_t f = getExprFunctionId(&pExpr[0]);
// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type); // assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
// } //
} // // pCtx->param[2].i = pQueryAttr->order.order;
// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// set the order information for top/bottom query // // pCtx->param[3].i = functionId;
int32_t functionId = pCtx->functionId; // // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { //
int32_t f = getExprFunctionId(&pExpr[0]); // // pCtx->param[1].i = pQueryAttr->order.col.info.colId;
assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); // } else if (functionId == FUNCTION_INTERP) {
// // pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
// pCtx->param[2].i = pQueryAttr->order.order; // // if (pQueryAttr->fillVal != NULL) {
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; // // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
pCtx->param[3].i = functionId; // // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; // // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
// // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
// pCtx->param[1].i = pQueryAttr->order.col.info.colId; // // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType);
} else if (functionId == FUNCTION_INTERP) { // // }
// pCtx->param[2].i = (int8_t)pQueryAttr->fillType; // // }
// if (pQueryAttr->fillVal != NULL) { // // }
// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { // } else if (functionId == FUNCTION_TWA) {
// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; // // pCtx->param[1].i = pQueryAttr->window.skey;
// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value // // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { // // pCtx->param[2].i = pQueryAttr->window.ekey;
// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); // // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// } // } else if (functionId == FUNCTION_ARITHM) {
// } // // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
// } // }
} else if (functionId == FUNCTION_TS_COMP) { // }
// pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_TWA) {
// pCtx->param[1].i = pQueryAttr->window.skey;
pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
// pCtx->param[2].i = pQueryAttr->window.ekey;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_ARITHM) {
// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
}
#endif
} }
for (int32_t i = 1; i < numOfOutput; ++i) { for (int32_t i = 1; i < numOfOutput; ++i) {
...@@ -1955,7 +1949,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -1955,7 +1949,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) { for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
taosVariantDestroy(&pCtx[i].param[j]); taosVariantDestroy(&pCtx[i].param[j].param);
} }
taosVariantDestroy(&pCtx[i].tag); taosVariantDestroy(&pCtx[i].tag);
...@@ -6487,9 +6481,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6487,9 +6481,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfCols = 0; int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL) {
return NULL;
}
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
......
...@@ -319,7 +319,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -319,7 +319,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// The read handle is not initialized yet, since no qualified tables exists // The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->dataReader == NULL) { if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
...@@ -375,10 +375,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -375,10 +375,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return p; return p;
} }
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
...@@ -391,19 +390,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -391,19 +390,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
return NULL; return NULL;
} }
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->sampleRatio = sampleRatio; pInfo->sampleRatio = sampleRatio;
pInfo->dataBlockLoadFlag = dataLoadFlag; pInfo->dataBlockLoadFlag= dataLoadFlag;
pInfo->pResBlock = pResBlock; pInfo->pResBlock = pResBlock;
pInfo->pFilterNode = pCondition; pInfo->pFilterNode = pCondition;
pInfo->dataReader = pTsdbReadHandle; pInfo->dataReader = pDataReader;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
pInfo->order = order; pInfo->order = order;
pInfo->current = 0; pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo; pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
...@@ -677,7 +676,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { ...@@ -677,7 +676,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
tsem_destroy(&pInfo->ready); tsem_destroy(&pInfo->ready);
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
} }
} }
...@@ -812,7 +812,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -812,7 +812,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
// retrieve local table list info from vnode // retrieve local table list info from vnode
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
if (pInfo->pCur == NULL) { if (pInfo->pCur == NULL) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
} }
...@@ -864,8 +865,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -864,8 +865,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
while (1) { while (1) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
pInfo->req.type = pInfo->type;
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
if (pInfo->showRewrite) { if (pInfo->showRewrite) {
...@@ -947,68 +946,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB ...@@ -947,68 +946,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
pInfo->pCondition = pCondition; pInfo->pCondition = pCondition;
pInfo->scanCols = colList; pInfo->scanCols = colList;
// TODO remove it
int32_t tableType = 0;
const char* name = tNameGetTableName(pName);
if (strncasecmp(name, TSDB_INS_TABLE_DNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_DNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_MNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_MODULE;
} else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_QNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_BNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_SNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CLUSTER;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_DB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_FUNC;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, tListLen(pName->tname)) == 0) {
// tableType = TSDB_MGMT_TABLE_INDEX;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_STB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, tListLen(pName->tname)) == 0) {
// tableType = TSDB_MGMT_TABLE_DIST;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_USER;
} else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_GRANTS;
} else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_VGROUP;
} else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_TOPICS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CONSUMERS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_SUBSCRIBES;
} else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_TRANS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_SMAS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CONFIGS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CONNS;
} else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_QUERIES;
} else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_VNODES;
} else {
ASSERT(0);
}
tNameAssign(&pInfo->name, pName); tNameAssign(&pInfo->name, pName);
pInfo->type = tableType; const char* name = tNameGetTableName(&pInfo->name);
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = pSysTableReadHandle; pInfo->readHandle = pSysTableReadHandle;
blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity);
} else { } else {
......
...@@ -58,6 +58,9 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); ...@@ -58,6 +58,9 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -193,8 +193,15 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t ...@@ -193,8 +193,15 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
pFunc->node.resType = (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -497,9 +504,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -497,9 +504,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateTop, .translateFunc = translateTop,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = functionSetup,
.processFunc = maxFunction, .processFunc = topFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
...@@ -876,7 +883,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -876,7 +883,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "tbname", .name = "tbname",
.type = FUNCTION_TYPE_TBNAME, .type = FUNCTION_TYPE_TBNAME,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
.translateFunc = NULL, .translateFunc = translateTbnameColumn,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = NULL, .sprocessFunc = NULL,
...@@ -914,7 +921,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -914,7 +921,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
}, },
{ {
.name = "_wendts", .name = "_wendts",
.type = FUNCTION_TYPE_QENDTS, .type = FUNCTION_TYPE_WENDTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.translateFunc = translateTimePseudoColumn, .translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv, .getEnvFunc = getTimePseudoFuncEnv,
......
...@@ -14,10 +14,11 @@ ...@@ -14,10 +14,11 @@
*/ */
#include "builtinsimpl.h" #include "builtinsimpl.h"
#include "tpercentile.h" #include <libs/nodes/querynodes.h>
#include "querynodes.h" #include "querynodes.h"
#include "taggfunction.h" #include "taggfunction.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tpercentile.h"
#define SET_VAL(_info, numOfElem, res) \ #define SET_VAL(_info, numOfElem, res) \
do { \ do { \
...@@ -472,17 +473,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) { ...@@ -472,17 +473,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct STopBotRes {
int32_t num;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
int32_t bytes = pColNode->node.resType.bytes;
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
return true;
}
typedef struct SStddevRes { typedef struct SStddevRes {
double result; double result;
int64_t count; int64_t count;
...@@ -523,7 +513,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { ...@@ -523,7 +513,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
int8_t* plist = (int8_t*)pCol->pData; int8_t* plist = (int8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
...@@ -749,9 +739,9 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) { ...@@ -749,9 +739,9 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// TODO set the correct parameter.
void percentileFinalize(SqlFunctionCtx* pCtx) { void percentileFinalize(SqlFunctionCtx* pCtx) {
double v = 50;//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; SVariant* pVal = &pCtx->param[1].param;
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo);
...@@ -1173,3 +1163,130 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { ...@@ -1173,3 +1163,130 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
} }
} }
typedef struct STopBotResItem {
SVariant v;
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
struct {
int32_t pageId;
int32_t offset;
} tuplePos; // tuple data of this chosen row
} STopBotResItem;
typedef struct STopBotRes {
int32_t num;
STopBotResItem *pItems;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
int32_t bytes = pColNode->node.resType.bytes;
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * bytes;
return true;
}
static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
pRes->pItems = (STopBotResItem*)((char*) pRes + sizeof(STopBotRes));
return pRes;
}
static void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid);
int32_t topFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = 0;
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
assert(pRes->num >= 0);
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
// buildTopBotStruct(pRes, pCtx);
// }
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pRes, pCtx->param[1].param.i, data, type, pInput->uid);
}
// treat the result as only one result
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
static int32_t topBotResComparFn(const void *p1, const void *p2, const void *param) {
uint16_t type = *(uint16_t *) param;
STopBotResItem *val1 = (STopBotResItem *) p1;
STopBotResItem *val2 = (STopBotResItem *) p2;
if (IS_SIGNED_NUMERIC_TYPE(type)) {
if (val1->v.i == val2->v.i) {
return 0;
}
return (val1->v.i > val2->v.i) ? 1 : -1;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
if (val1->v.u == val2->v.u) {
return 0;
}
return (val1->v.u > val2->v.u) ? 1 : -1;
}
if (val1->v.d == val2->v.d) {
return 0;
}
return (val1->v.d > val2->v.d) ? 1 : -1;
}
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid) {
SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
STopBotResItem *pItems = pRes->pItems;
assert(pItems != NULL);
// not full yet
if (pRes->num < maxSize) {
STopBotResItem* pItem = &pItems[pRes->num];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
pRes->num++;
taosheapsort((void *) pItem, sizeof(STopBotResItem), pRes->num, (const void *) &type, topBotResComparFn, false);
} else { // replace the minimum value in the result
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
STopBotResItem* pItem = &pItems[pRes->num];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false);
}
}
}
void topBotFinalize(SqlFunctionCtx* pCtx) {
functionFinalize(pCtx);
}
\ No newline at end of file
...@@ -85,13 +85,19 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) { ...@@ -85,13 +85,19 @@ static EDealRes doNameExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static int32_t rewriteExpr(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) { static int32_t rewriteExprForSelect(SNodeList* pExprs, SSelectStmt* pSelect, ESqlClause clause) {
nodesWalkExprs(pExprs, doNameExpr, NULL); nodesWalkExprs(pExprs, doNameExpr, NULL);
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs }; SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt); nodesRewriteSelectStmt(pSelect, clause, doRewriteExpr, &cxt);
return cxt.errCode; return cxt.errCode;
} }
static int32_t rewriteExprs(SNodeList* pExprs, SNodeList* pTarget) {
SRewriteExprCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pExprs = pExprs };
nodesRewriteExprs(pTarget, doRewriteExpr, &cxt);
return cxt.errCode;
}
static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) { static int32_t pushLogicNode(SLogicPlanContext* pCxt, SLogicNode** pOldRoot, SLogicNode* pNewRoot) {
if (NULL == pNewRoot->pChildren) { if (NULL == pNewRoot->pChildren) {
pNewRoot->pChildren = nodesMakeList(); pNewRoot->pChildren = nodesMakeList();
...@@ -433,10 +439,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, ...@@ -433,10 +439,10 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY); code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_GROUP_BY);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY); code = rewriteExprForSelect(pAgg->pAggFuncs, pSelect, SQL_CLAUSE_GROUP_BY);
} }
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) { if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pHaving) {
...@@ -472,7 +478,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm ...@@ -472,7 +478,7 @@ static int32_t createWindowLogicNodeFinalize(SLogicPlanContext* pCxt, SSelectStm
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW); code = rewriteExprForSelect(pWindow->pFuncs, pSelect, SQL_CLAUSE_WINDOW);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -723,7 +729,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe ...@@ -723,7 +729,7 @@ static int32_t createDistinctLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSe
// rewrite the expression in subsequent clauses // rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = rewriteExpr(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT); code = rewriteExprForSelect(pAgg->pGroupKeys, pSelect, SQL_CLAUSE_DISTINCT);
} }
// set the output // set the output
...@@ -850,6 +856,37 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator ...@@ -850,6 +856,37 @@ static int32_t createSetOpProjectLogicNode(SLogicPlanContext* pCxt, SSetOperator
return code; return code;
} }
static int32_t createSetOpAggLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
SAggLogicNode* pAgg = (SAggLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t code = TSDB_CODE_SUCCESS;
pAgg->pGroupKeys = nodesCloneList(pSetOperator->pProjectionList);
if (NULL == pAgg->pGroupKeys) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
// rewrite the expression in subsequent clauses
if (TSDB_CODE_SUCCESS == code) {
code = rewriteExprs(pAgg->pGroupKeys, pSetOperator->pOrderByList);
}
// set the output
if (TSDB_CODE_SUCCESS == code) {
code = createColumnByRewriteExps(pCxt, pAgg->pGroupKeys, &pAgg->node.pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
*pLogicNode = (SLogicNode*)pAgg;
} else {
nodesDestroyNode(pAgg);
}
return code;
}
static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) { static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetOperator, SLogicNode** pLogicNode) {
SLogicNode* pSetOp = NULL; SLogicNode* pSetOp = NULL;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
...@@ -857,6 +894,9 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO ...@@ -857,6 +894,9 @@ static int32_t createSetOpLogicNode(SLogicPlanContext* pCxt, SSetOperator* pSetO
case SET_OP_TYPE_UNION_ALL: case SET_OP_TYPE_UNION_ALL:
code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp); code = createSetOpProjectLogicNode(pCxt, pSetOperator, &pSetOp);
break; break;
case SET_OP_TYPE_UNION:
code = createSetOpAggLogicNode(pCxt, pSetOperator, &pSetOp);
break;
default: default:
code = -1; code = -1;
break; break;
......
...@@ -50,6 +50,11 @@ typedef struct SUaInfo { ...@@ -50,6 +50,11 @@ typedef struct SUaInfo {
SLogicSubplan* pSubplan; SLogicSubplan* pSubplan;
} SUaInfo; } SUaInfo;
typedef struct SUnInfo {
SAggLogicNode* pAgg;
SLogicSubplan* pSubplan;
} SUnInfo;
typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo); typedef bool (*FSplFindSplitNode)(SLogicSubplan* pSubplan, void* pInfo);
static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) { static SLogicSubplan* splCreateScanSubplan(SSplitContext* pCxt, SScanLogicNode* pScan, int32_t flag) {
...@@ -226,7 +231,6 @@ static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) { ...@@ -226,7 +231,6 @@ static SLogicSubplan* uaCreateSubplan(SSplitContext* pCxt, SLogicNode* pNode) {
pSubplan->id.groupId = pCxt->groupId; pSubplan->id.groupId = pCxt->groupId;
pSubplan->subplanType = SUBPLAN_TYPE_SCAN; pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
pSubplan->pNode = pNode; pSubplan->pNode = pNode;
// TSWAP(pSubplan->pVgroupList, ((SScanLogicNode*)pSubplan->pNode)->pVgroupList, SVgroupsInfo*);
return pSubplan; return pSubplan;
} }
...@@ -244,24 +248,22 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan ...@@ -244,24 +248,22 @@ static int32_t uaCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan
pSubplan->subplanType = SUBPLAN_TYPE_MERGE; pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return nodesListMakeAppend(&pProject->node.pChildren, (SNode*)pExchange); if (NULL == pProject->node.pParent) {
pSubplan->pNode = (SLogicNode*)pExchange;
// if (NULL == pProject->node.pParent) { nodesDestroyNode(pProject);
// pSubplan->pNode = (SLogicNode*)pExchange; return TSDB_CODE_SUCCESS;
// nodesDestroyNode(pProject); }
// return TSDB_CODE_SUCCESS;
// } SNode* pNode;
FOREACH(pNode, pProject->node.pParent->pChildren) {
// SNode* pNode; if (nodesEqualNode(pNode, pProject)) {
// FOREACH(pNode, pProject->node.pParent->pChildren) { REPLACE_NODE(pExchange);
// if (nodesEqualNode(pNode, pProject)) { nodesDestroyNode(pNode);
// REPLACE_NODE(pExchange); return TSDB_CODE_SUCCESS;
// nodesDestroyNode(pNode); }
// return TSDB_CODE_SUCCESS; }
// } nodesDestroyNode(pExchange);
// } return TSDB_CODE_FAILED;
// nodesDestroyNode(pExchange);
// return TSDB_CODE_FAILED;
} }
static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
...@@ -291,10 +293,78 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) { ...@@ -291,10 +293,78 @@ static int32_t uaSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code; return code;
} }
static SLogicNode* unMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = uaMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static int32_t unCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
SExchangeLogicNode* pExchange = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pExchange->srcGroupId = pCxt->groupId;
// pExchange->precision = pScan->pMeta->tableInfo.precision;
pExchange->node.pTargets = nodesCloneList(pAgg->node.pTargets);
if (NULL == pExchange->node.pTargets) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
return nodesListMakeAppend(&pAgg->node.pChildren, pExchange);
}
static bool unFindSplitNode(SLogicSubplan* pSubplan, SUnInfo* pInfo) {
SLogicNode* pSplitNode = unMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pAgg = (SAggLogicNode*)pSplitNode;
pInfo->pSubplan = pSubplan;
}
return NULL != pSplitNode;
}
static int32_t unSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
SUnInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)unFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = TSDB_CODE_SUCCESS;
SNode* pChild = NULL;
FOREACH(pChild, info.pAgg->node.pChildren) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, uaCreateSubplan(pCxt, (SLogicNode*)pChild));
if (TSDB_CODE_SUCCESS == code) {
REPLACE_NODE(NULL);
} else {
break;
}
}
if (TSDB_CODE_SUCCESS == code) {
nodesClearList(info.pAgg->node.pChildren);
info.pAgg->node.pChildren = NULL;
code = unCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
static const SSplitRule splitRuleSet[] = { static const SSplitRule splitRuleSet[] = {
{ .pName = "SuperTableScan", .splitFunc = stsSplit }, { .pName = "SuperTableScan", .splitFunc = stsSplit },
{ .pName = "ChildTableJoin", .splitFunc = ctjSplit }, { .pName = "ChildTableJoin", .splitFunc = ctjSplit },
{ .pName = "UnionAll", .splitFunc = uaSplit }, { .pName = "UnionAll", .splitFunc = uaSplit },
{ .pName = "Union", .splitFunc = unSplit }
}; };
static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule)); static const int32_t splitRuleNum = (sizeof(splitRuleSet) / sizeof(SSplitRule));
......
...@@ -27,3 +27,9 @@ TEST_F(PlanSetOpTest, unionAll) { ...@@ -27,3 +27,9 @@ TEST_F(PlanSetOpTest, unionAll) {
run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20"); run("select c1, c2 from t1 where c1 > 10 union all select c1, c2 from t1 where c1 > 20");
} }
TEST_F(PlanSetOpTest, union) {
useDb("root", "test");
run("select c1, c2 from t1 where c1 > 10 union select c1, c2 from t1 where c1 > 20");
}
...@@ -62,7 +62,7 @@ public: ...@@ -62,7 +62,7 @@ public:
doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan); doScaleOutLogicPlan(&cxt, pLogicSubplan, &pLogicPlan);
SQueryPlan* pPlan = nullptr; SQueryPlan* pPlan = nullptr;
doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan, NULL); doCreatePhysiPlan(&cxt, pLogicPlan, &pPlan);
if (g_isDump) { if (g_isDump) {
dump(); dump();
...@@ -162,7 +162,8 @@ private: ...@@ -162,7 +162,8 @@ private:
res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan)); res_.scaledLogicPlan_ = toString((SNode*)(*pLogicPlan));
} }
void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) { void doCreatePhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) {
SArray* pExecNodeList = taosArrayInit(TARRAY_MIN_SIZE, sizeof(SQueryNodeAddr));
DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList); DO_WITH_THROW(createPhysiPlan, pCxt, pLogicPlan, pPlan, pExecNodeList);
res_.physiPlan_ = toString((SNode*)(*pPlan)); res_.physiPlan_ = toString((SNode*)(*pPlan));
SNode* pNode; SNode* pNode;
......
...@@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) { ...@@ -380,7 +380,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
#if FILE_WITH_LOCK #if FILE_WITH_LOCK
taosThreadRwlockWrlock(&(pFile->rwlock)); taosThreadRwlockWrlock(&(pFile->rwlock));
#endif #endif
/*assert(pFile->fd >= 0); // Please check if you have closed the file.*/ assert(pFile->fd >= 0); // Please check if you have closed the file.
int64_t nleft = count; int64_t nleft = count;
int64_t nwritten = 0; int64_t nwritten = 0;
......
...@@ -229,22 +229,21 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size ...@@ -229,22 +229,21 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
} }
void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar,
__ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot) { __ext_compar_fn_t compar, char* buf, bool maxroot) {
int32_t parent; int32_t parent;
int32_t child; int32_t child;
char *buf;
char* tmp = NULL;
if (buf == NULL) {
tmp = taosMemoryMalloc(size);
} else {
tmp = buf;
}
if (base && size > 0 && compar) { if (base && size > 0 && compar) {
parent = start; parent = start;
child = 2 * parent + 1; child = 2 * parent + 1;
if (swap == NULL) {
buf = taosMemoryCalloc(1, size);
if (buf == NULL) {
return;
}
}
if (maxroot) { if (maxroot) {
while (child <= end) { while (child <= end) {
if (child + 1 <= end && if (child + 1 <= end &&
...@@ -256,11 +255,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const ...@@ -256,11 +255,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
break; break;
} }
if (swap == NULL) { doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp);
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf);
} else {
(*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap);
}
parent = child; parent = child;
child = 2 * parent + 1; child = 2 * parent + 1;
...@@ -276,33 +271,35 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const ...@@ -276,33 +271,35 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
break; break;
} }
if (swap == NULL) { doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp);
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf);
} else {
(*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap);
}
parent = child; parent = child;
child = 2 * parent + 1; child = 2 * parent + 1;
} }
} }
}
if (swap == NULL) { if (buf == NULL) {
taosMemoryFreeClear(buf); taosMemoryFree(tmp);
}
} }
} }
void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar,
const void *parswap, __ext_swap_fn_t swap, bool maxroot) { bool maxroot) {
int32_t i; int32_t i;
char* buf = taosMemoryCalloc(1, size);
if (buf == NULL) {
return;
}
if (base && size > 0) { if (base && size > 0) {
for (i = len / 2 - 1; i >= 0; i--) { for (i = len / 2 - 1; i >= 0; i--) {
taosheapadjust(base, size, i, len - 1, parcompar, compar, parswap, swap, maxroot); taosheapadjust(base, size, i, len - 1, parcompar, compar, buf, maxroot);
} }
} }
taosMemoryFree(buf);
/* /*
char *buf = taosMemoryCalloc(1, size); char *buf = taosMemoryCalloc(1, size);
......
...@@ -222,7 +222,7 @@ static void *taosThreadToOpenNewFile(void *param) { ...@@ -222,7 +222,7 @@ static void *taosThreadToOpenNewFile(void *param) {
tsLogObj.logHandle->pFile = pFile; tsLogObj.logHandle->pFile = pFile;
tsLogObj.lines = 0; tsLogObj.lines = 0;
tsLogObj.openInProgress = 0; tsLogObj.openInProgress = 0;
taosSsleep(3); taosSsleep(10);
taosCloseLogByFd(pOldFile); taosCloseLogByFd(pOldFile);
uInfo(" new log file:%d is opened", tsLogObj.flag); uInfo(" new log file:%d is opened", tsLogObj.flag);
......
...@@ -279,7 +279,8 @@ endi ...@@ -279,7 +279,8 @@ endi
print ================> syntax error check not active ================> reactive print ================> syntax error check not active ================> reactive
sql_error select * from dev_001 session(ts,1w) sql_error select * from dev_001 session(ts,1w)
sql_error select count(*) from st session(ts,1w) print disable this temporarily, session can not be directly applied to super table.
#sql_error select count(*) from st session(ts,1w)
sql_error select count(*) from dev_001 group by tagtype session(ts,1w) sql_error select count(*) from dev_001 group by tagtype session(ts,1w)
sql_error sql select count(*) from dev_001 session(ts,1n) sql_error sql select count(*) from dev_001 session(ts,1n)
sql_error sql select count(*) from dev_001 session(ts,1y) sql_error sql select count(*) from dev_001 session(ts,1y)
......
...@@ -8,196 +8,245 @@ ...@@ -8,196 +8,245 @@
# #
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
# #
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1 run tsim/tmq/prepareBasicEnv.sim
system sh/exec.sh -n dnode1 -s start
#---- global parameters start ----#
$loop_cnt = 0 $dbName = db
check_dnode_ready: $vgroups = 1
$loop_cnt = $loop_cnt + 1 $stbPrefix = stb
sleep 200 $ctbPrefix = ctb
if $loop_cnt == 10 then $ntbPrefix = ntb
print ====> dnode not ready! $stbNum = 1
return -1 $ctbNum = 10
endi $ntbNum = 10
sql show dnodes $rowsPerCtb = 100
print ===> $rows $data00 $data01 $data02 $data03 $data04 $data05 $tstart = 1640966400000 # 2022-01-01 00:00:00.000
if $data00 != 1 then #---- global parameters end ----#
return -1
endi $pullDelay = 3
if $data04 != ready then $ifcheckdata = 1
goto check_dnode_ready $showMsg = 1
endi $showRow = 0
sql connect sql connect
sql use $dbName
$dbNamme = d0 print == create topics from super table
print =============== create database , vgroup 4 sql create topic topic_stb_column as select ts, c3 from stb
sql create database $dbNamme vgroups 4 sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
sql use $dbNamme sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
print =============== create super table
sql create table if not exists stb (ts timestamp, c1 int, c2 float, c3 binary(10)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print =============== create child table
sql create table ct0 using stb tags(1000)
sql create table ct1 using stb tags(2000)
#sql create table ct3 using stb tags(3000)
print =============== create normal table print == create topics from child table
sql create table ntb (ts timestamp, c1 int, c2 float, c3 binary(10)) sql create topic topic_ctb_column as select ts, c3 from ctb0
sql create topic topic_ctb_all as select * from ctb0
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
print =============== create multi topics. notes: now only support: print == create topics from normal table
print =============== 1. columns from stb; 2. * from ctb; 3. columns from ctb sql create topic topic_ntb_column as select ts, c3 from ntb0
print =============== will support: * from stb; function from stb/ctb sql create topic topic_ntb_all as select * from ntb0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
sql create topic topic_stb_column as select ts, c1, c3 from stb #sql show topics
#sql create topic topic_stb_all as select * from stb #if $rows != 9 then
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb # return -1
#endi
sql create topic topic_ctb_column as select ts, c1, c3 from ct0 $keyList = ' . group.id:cgrp1
sql create topic topic_ctb_all as select * from ct0 $keyList = $keyList . '
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ct0
sql create topic topic_ntb_column as select ts, c1, c3 from ntb print ================ test consume from stb
sql create topic topic_ntb_all as select * from ntb $loop_cnt = 0
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb loop_consume_diff_topic_from_stb:
if $loop_cnt == 0 then
print == scenario 1: topic_stb_column
$topicList = ' . topic_stb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_stb_all
$topicList = ' . topic_stb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_stb_function
$topicList = ' . topic_stb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_stb_end
endi
sql show tables $consumerId = 0
if $rows != 3 then $totalMsgOfStb = $ctbNum * $rowsPerCtb
#$expectmsgcnt = $totalMsgOfStb + 1
$expectmsgcnt = 110
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
print == check consume result
wait_consumer_end_from_stb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_stb
endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1 return -1
endi endi
if $data[0][3] != $expectmsgcnt then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_stb
loop_consume_diff_topic_from_stb_end:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb1
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
print =============== insert data sql show tables
if $rows != 2 then
$tbPrefix = ct
$tbNum = 2
$rowNum = 10
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
$i = 0
while $i < $tbNum
$tb = $tbPrefix . $i
$x = 0
while $x < $rowNum
$c = $x / 10
$c = $c * 10
$c = $x - $c
$binary = ' . binary
$binary = $binary . $c
$binary = $binary . '
sql insert into $tb values ($tstart , $c , $x , $binary )
sql insert into ntb values ($tstart , $c , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
#root@trd02 /home $ tmq_sim --help
# -c Configuration directory, default is
# -d The name of the database for cosumer, no default
# -t The topic string for cosumer, no default
# -k The key-value string for cosumer, no default
# -g showMsgFlag, default is 0
#
$totalMsgCnt = $rowNum * $tbNum
print inserted totalMsgCnt: $totalMsgCnt
# supported key:
# group.id:<xxx>
# enable.auto.commit:<true | false>
# auto.offset.reset:<earliest | latest | none>
# td.connect.ip:<fqdn | ipaddress>
# td.connect.user:root
# td.connect.pass:taosdata
# td.connect.port:6030
# td.connect.db:db
system_content echo -n \$BUILD_DIR
$tmq_sim = $system_content . /build/bin/tmq_sim
system_content echo -n \$SIM_DIR
$tsim_cfg = $system_content . /tsim/cfg
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20, 0}@ then
return -1 return -1
endi endi
#######################################################################################
#print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#print cmd result----> $system_content
#if $system_content != @{consume success: 20, 0}@ then
# return -1
#endi
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" print ================ test consume from ctb
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_stb_function" -k "group.id:tg2" $loop_cnt = 0
print cmd result----> $system_content loop_consume_diff_topic_from_ctb:
if $system_content != @{consume success: 20, 0}@ then if $loop_cnt == 0 then
print expect @{consume success: 20, 0}@, actual: $system_content print == scenario 1: topic_ctb_column
return -1 $topicList = ' . topic_ctb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ctb_all
$topicList = ' . topic_ctb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ctb_function
$topicList = ' . topic_ctb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ctb_end
endi endi
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" $consumerId = 0
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" $totalMsgOfCtb = $rowsPerCtb
print cmd result----> $system_content $expectmsgcnt = $totalMsgOfCtb + 1
if $system_content != @{consume success: 10, 0}@ then sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result
wait_consumer_end_from_ctb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ctb
endi
if $data[0][1] != $consumerId then
return -1 return -1
endi endi
if $data[0][2] != $totalMsgOfCtb then
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 10, 0}@ then
return -1 return -1
endi endi
if $data[0][3] != $totalMsgOfCtb then
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ctb_function" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 10, 0}@ then
return -1 return -1
endi endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ctb
loop_consume_diff_topic_from_ctb_end:
#######################################################################################
# clear consume info and consume result
#run tsim/tmq/clearConsume.sim
# because drop table function no stable, so by create new db for consume info and result. Modify it later
$cdbName = cdb2
sql create database $cdbName vgroups 1
sleep 500
sql use $cdbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" sql show tables
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" if $rows != 2 then
print cmd result----> $system_content
if $system_content != @{consume success: 20, 0}@ then
return -1 return -1
endi endi
#######################################################################################
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" print ================ test consume from ntb
print cmd result----> $system_content $loop_cnt = 0
if $system_content != @{consume success: 20, 0}@ then loop_consume_diff_topic_from_ntb:
return -1 if $loop_cnt == 0 then
print == scenario 1: topic_ntb_column
$topicList = ' . topic_ntb_column
$topicList = $topicList . '
elif $loop_cnt == 1 then
print == scenario 2: topic_ntb_all
$topicList = ' . topic_ntb_all
$topicList = $topicList . '
elif $loop_cnt == 2 then
print == scenario 3: topic_ntb_function
$topicList = ' . topic_ntb_function
$topicList = $topicList . '
else
goto loop_consume_diff_topic_from_ntb_end
endi endi
print cmd===> system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" $consumerId = 0
system_content $tmq_sim -c $tsim_cfg -d $dbNamme -t "topic_ntb_function" -k "group.id:tg2" $totalMsgOfNtb = $rowsPerCtb
print cmd result----> $system_content $expectmsgcnt = $totalMsgOfNtb + 1
if $system_content != @{consume success: 20, 0}@ then sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
print == start consumer to pull msgs from stb
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
print == check consume result from ntb
wait_consumer_end_from_ntb:
sql select * from consumeresult
print ==> rows: $rows
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
if $rows != 1 then
sleep 1000
goto wait_consumer_end_from_ntb
endi
if $data[0][1] != $consumerId then
return -1 return -1
endi endi
if $data[0][2] != $totalMsgOfNtb then
return -1
endi
if $data[0][3] != $totalMsgOfNtb then
return -1
endi
$loop_cnt = $loop_cnt + 1
goto loop_consume_diff_topic_from_ntb
loop_consume_diff_topic_from_ntb_end:
print =============== create database , vgroup 4 #------ not need stop consumer, because it exit after pull msg overthan expect msg
$dbNamme = d1 #system tsim/tmq/consume.sh -s stop -x SIGINT
sql create database $dbNamme vgroups 4
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
sql connect
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
sql use $dbName
print == create consume info table and consume result table
sql drop table consumeinfo
sql drop table consumeresult
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
...@@ -11,16 +11,25 @@ set +e ...@@ -11,16 +11,25 @@ set +e
# set default value for parameters # set default value for parameters
EXEC_OPTON=start EXEC_OPTON=start
DB_NAME=db DB_NAME=db
CDB_NAME=db
POLL_DELAY=5 POLL_DELAY=5
VALGRIND=0 VALGRIND=0
SIGNAL=SIGINT SIGNAL=SIGINT
SHOW_MSG=0
SHOW_ROW=0
while getopts "d:s:v:y:x:" arg while getopts "d:s:v:y:x:g:r:w:" arg
do do
case $arg in case $arg in
d) d)
DB_NAME=$OPTARG DB_NAME=$OPTARG
;; ;;
g)
SHOW_MSG=$OPTARG
;;
r)
SHOW_ROW=$OPTARG
;;
s) s)
EXEC_OPTON=$OPTARG EXEC_OPTON=$OPTARG
;; ;;
...@@ -33,6 +42,9 @@ do ...@@ -33,6 +42,9 @@ do
x) x)
SIGNAL=$OPTARG SIGNAL=$OPTARG
;; ;;
w)
CDB_NAME=$OPTARG
;;
?) ?)
echo "unkown argument" echo "unkown argument"
;; ;;
...@@ -80,11 +92,11 @@ echo "DB_NAME: $DB_NAME ...@@ -80,11 +92,11 @@ echo "DB_NAME: $DB_NAME
echo "------------------------------------------------------------------------" echo "------------------------------------------------------------------------"
if [ "$EXEC_OPTON" = "start" ]; then if [ "$EXEC_OPTON" = "start" ]; then
if [ $VALGRIND -eq 1 ]; then if [ $VALGRIND -eq 1 ]; then
echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 & echo nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 &
nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 & nohup valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tmq_sim.log $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW > /dev/null 2>&1 &
else else
echo "nohup $PROGRAM -c $CFG_DIR -d $DB_NAME -y $POLL_DELAY > /dev/null 2>&1 &" echo "nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 &"
nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME > /dev/null 2>&1 & nohup $PROGRAM -c $CFG_DIR -y $POLL_DELAY -d $DB_NAME -g $SHOW_MSG -r $SHOW_ROW -w $CDB_NAME > /dev/null 2>&1 &
fi fi
else else
PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'` PID=`ps -ef|grep tmq_sim | grep -v grep | awk '{print $2}'`
......
# stop all dnodes before start this case
system sh/stop_dnodes.sh
# deploy dnode 1
system sh/deploy.sh -n dnode1 -i 1
# add some config items for this case
#system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
# start dnode 1
system sh/exec.sh -n dnode1 -s start
sql connect
#---- global parameters start ----#
$dbName = db
$vgroups = 1
$stbPrefix = stb
$ctbPrefix = ctb
$ntbPrefix = ntb
$stbNum = 1
$ctbNum = 10
$ntbNum = 10
$rowsPerCtb = 100
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
#---- global parameters end ----#
print == create database $dbName vgroups $vgroups
sql create database $dbName vgroups $vgroups
#wait database ready
$loop_cnt = 0
check_db_ready:
if $loop_cnt == 10 then
print ====> database not ready!
return -1
endi
sql show databases
print ==> rows: $rows
print ==> $data(db)[0] $data(db)[1] $data(db)[2] $data(db)[3] $data(db)[4] $data(db)[5] $data(db)[6] $data(db)[7] $data(db)[8] $data(db)[9] $data(db)[10] $data(db)[11] $data(db)[12]
print $data(db)[13] $data(db)[14] $data(db)[15] $data(db)[16] $data(db)[17] $data(db)[18] $data(db)[19] $data(db)[20]
if $data(db)[20] != nostrict then
sleep 100
$loop_cnt = $loop_cnt + 1
goto check_db_ready
endi
sql use $dbName
print == create consume info table and consume result table
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
sql show tables
if $rows != 2 then
return -1
endi
print == create super table
sql create table $stbPrefix (ts timestamp, c1 int, c2 float, c3 binary(16)) tags (t1 int)
sql show stables
if $rows != 1 then
return -1
endi
print == create child table, normal table and insert data
$i = 0
while $i < $ctbNum
$ctb = $ctbPrefix . $i
$ntb = $ntbPrefix . $i
sql create table $ctb using $stbPrefix tags( $i )
sql create table $ntb (ts timestamp, c1 int, c2 float, c3 binary(16))
$x = 0
while $x < $rowsPerCtb
$binary = ' . binary-
$binary = $binary . $i
$binary = $binary . '
sql insert into $ctb values ($tstart , $i , $x , $binary )
sql insert into $ntb values ($tstart , $i , $x , $binary )
$tstart = $tstart + 1
$x = $x + 1
endw
$i = $i + 1
$tstart = 1640966400000
endw
...@@ -31,46 +31,49 @@ ...@@ -31,46 +31,49 @@
#define NC "\033[0m" #define NC "\033[0m"
#define min(a, b) (((a) < (b)) ? (a) : (b)) #define min(a, b) (((a) < (b)) ? (a) : (b))
#define MAX_SQL_STR_LEN (1024 * 1024) #define MAX_SQL_STR_LEN (1024 * 1024)
#define MAX_ROW_STR_LEN (16 * 1024) #define MAX_ROW_STR_LEN (16 * 1024)
#define MAX_CONSUMER_THREAD_CNT (16) #define MAX_CONSUMER_THREAD_CNT (16)
typedef struct { typedef struct {
TdThread thread; TdThread thread;
int32_t consumerId; int32_t consumerId;
int32_t ifCheckData; int32_t ifCheckData;
int64_t expectMsgCnt; int64_t expectMsgCnt;
int64_t consumeMsgCnt;
int64_t consumeRowCnt;
int32_t checkresult;
int64_t consumeMsgCnt; char topicString[1024];
int32_t checkresult; char keyString[1024];
char topicString[1024]; int32_t numOfTopic;
char keyString[1024]; char topics[32][64];
int32_t numOfTopic; int32_t numOfKey;
char topics[32][64]; char key[32][64];
char value[32][64];
int32_t numOfKey;
char key[32][64];
char value[32][64];
tmq_t* tmq; tmq_t* tmq;
tmq_list_t* topicList; tmq_list_t* topicList;
} SThreadInfo; } SThreadInfo;
typedef struct { typedef struct {
// input from argvs // input from argvs
char dbName[32]; char cdbName[32];
int32_t showMsgFlag; char dbName[32];
int32_t consumeDelay; // unit s int32_t showMsgFlag;
int32_t numOfThread; int32_t showRowFlag;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; int32_t consumeDelay; // unit s
int32_t numOfThread;
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
} SConfInfo; } SConfInfo;
static SConfInfo g_stConfInfo; static SConfInfo g_stConfInfo;
TdFilePtr g_fp = NULL; TdFilePtr g_fp = NULL;
// char* g_pRowValue = NULL; // char* g_pRowValue = NULL;
// TdFilePtr g_fp = NULL; // TdFilePtr g_fp = NULL;
...@@ -85,51 +88,62 @@ static void printHelp() { ...@@ -85,51 +88,62 @@ static void printHelp() {
printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default "); printf("%s%s%s\n", indent, indent, "The name of the database for cosumer, no default ");
printf("%s%s\n", indent, "-g"); printf("%s%s\n", indent, "-g");
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
printf("%s%s\n", indent, "-r");
printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
printf("%s%s\n", indent, "-y"); printf("%s%s\n", indent, "-y");
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
void initLogFile() { void initLogFile() {
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); // FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
TdFilePtr pFile = taosOpenFile("./tmqlog.txt", TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); char file[256];
sprintf(file, "%s/../log/tmqlog.txt", configDir);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
if (NULL == pFile) { if (NULL == pFile) {
fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt"); fprintf(stderr, "Failed to open %s for save result\n", "./tmqlog.txt");
exit - 1; exit -1;
}; };
g_fp = pFile; g_fp = pFile;
}
void saveConfigToLogFile() {
time_t tTime = taosGetTimestampSec(); time_t tTime = taosGetTimestampSec();
struct tm tm = *taosLocalTime(&tTime, NULL); struct tm tm = *taosLocalTime(&tTime, NULL);
taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(g_fp, "###################################################################\n");
taosFprintfFile(pFile, "# configDir: %s\n", configDir); taosFprintfFile(g_fp, "# configDir: %s\n", configDir);
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName);
taosFprintfFile(pFile, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag); taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
taosFprintfFile(pFile, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay); taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
taosFprintfFile(pFile, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, " Topics: "); for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
for (int i = 0; i < g_stConfInfo.stThreads[i].numOfTopic; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
taosFprintfFile(pFile, "%s, ", g_stConfInfo.stThreads[i].topics[i]); taosFprintfFile(g_fp, " Topics: ");
for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfTopic; i++) {
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[i]);
} }
taosFprintfFile(pFile, "\n"); taosFprintfFile(g_fp, "\n");
taosFprintfFile(pFile, " Key: "); taosFprintfFile(g_fp, " Key: ");
for (int i = 0; i < g_stConfInfo.stThreads[i].numOfKey; i++) { for (int i = 0 ; i < g_stConfInfo.stThreads[i].numOfKey; i++) {
taosFprintfFile(pFile, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]); taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[i], g_stConfInfo.stThreads[i].value[i]);
} }
taosFprintfFile(pFile, "\n"); taosFprintfFile(g_fp, "\n");
} }
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
taosFprintfFile(pFile, "###################################################################\n"); taosFprintfFile(g_fp, "###################################################################\n");
} }
void parseArgument(int32_t argc, char* argv[]) { void parseArgument(int32_t argc, char* argv[]) {
memset(&g_stConfInfo, 0, sizeof(SConfInfo)); memset(&g_stConfInfo, 0, sizeof(SConfInfo));
g_stConfInfo.showMsgFlag = 0; g_stConfInfo.showMsgFlag = 0;
g_stConfInfo.showRowFlag = 0;
g_stConfInfo.consumeDelay = 5; g_stConfInfo.consumeDelay = 5;
for (int32_t i = 1; i < argc; i++) { for (int32_t i = 1; i < argc; i++) {
...@@ -138,10 +152,14 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -138,10 +152,14 @@ void parseArgument(int32_t argc, char* argv[]) {
exit(0); exit(0);
} else if (strcmp(argv[i], "-d") == 0) { } else if (strcmp(argv[i], "-d") == 0) {
strcpy(g_stConfInfo.dbName, argv[++i]); strcpy(g_stConfInfo.dbName, argv[++i]);
} else if (strcmp(argv[i], "-w") == 0) {
strcpy(g_stConfInfo.cdbName, argv[++i]);
} else if (strcmp(argv[i], "-c") == 0) { } else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]); strcpy(configDir, argv[++i]);
} else if (strcmp(argv[i], "-g") == 0) { } else if (strcmp(argv[i], "-g") == 0) {
g_stConfInfo.showMsgFlag = atol(argv[++i]); g_stConfInfo.showMsgFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
g_stConfInfo.showRowFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-y") == 0) { } else if (strcmp(argv[i], "-y") == 0) {
g_stConfInfo.consumeDelay = atol(argv[++i]); g_stConfInfo.consumeDelay = atol(argv[++i]);
} else { } else {
...@@ -150,11 +168,17 @@ void parseArgument(int32_t argc, char* argv[]) { ...@@ -150,11 +168,17 @@ void parseArgument(int32_t argc, char* argv[]) {
} }
} }
initLogFile();
taosFprintfFile(g_fp, "====parseArgument() success\n");
#if 1 #if 1
pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s cdbName:%s %s", GREEN, g_stConfInfo.cdbName, NC);
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC); pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
#endif #endif
} }
...@@ -180,24 +204,29 @@ void ltrim(char* str) { ...@@ -180,24 +204,29 @@ void ltrim(char* str) {
// return str; // return str;
} }
static int running = 1; static int running = 1;
static void msg_process(TAOS_RES* msg, int32_t msgIndex, int32_t threadLable) { static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
char buf[1024]; char buf[1024];
int32_t totalRows = 0;
// printf("topic: %s\n", tmq_get_topic_name(msg)); //printf("topic: %s\n", tmq_get_topic_name(msg));
// printf("vg:%d\n", tmq_get_vgroup_id(msg)); //printf("vg:%d\n", tmq_get_vgroup_id(msg));
taosFprintfFile(g_fp, "msg index:%d, threadLable: %d\n", msgIndex, threadLable); taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable);
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg)); taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
while (1) { while (1) {
TAOS_ROW row = taos_fetch_row(msg); TAOS_ROW row = taos_fetch_row(msg);
if (row == NULL) break; if (row == NULL) break;
TAOS_FIELD* fields = taos_fetch_fields(msg); if (0 != g_stConfInfo.showRowFlag) {
int32_t numOfFields = taos_field_count(msg); TAOS_FIELD* fields = taos_fetch_fields(msg);
// taos_print_row(buf, row, fields, numOfFields); int32_t numOfFields = taos_field_count(msg);
// printf("%s\n", buf); taos_print_row(buf, row, fields, numOfFields);
// taosFprintfFile(g_fp, "%s\n", buf); taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
}
totalRows++;
} }
return totalRows;
} }
int queryDB(TAOS* taos, char* command) { int queryDB(TAOS* taos, char* command) {
...@@ -213,31 +242,43 @@ int queryDB(TAOS* taos, char* command) { ...@@ -213,31 +242,43 @@ int queryDB(TAOS* taos, char* command) {
return 0; return 0;
} }
void build_consumer(SThreadInfo* pInfo) { static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets) {
char sqlStr[1024] = {0}; printf("tmq_commit_cb_print() commit %d\n", resp);
}
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); void build_consumer(SThreadInfo *pInfo) {
assert(pConn != NULL); tmq_conf_t* conf = tmq_conf_new();
sprintf(sqlStr, "use %s", g_stConfInfo.dbName); //tmq_conf_set(conf, "td.connect.ip", "localhost");
TAOS_RES* pRes = taos_query(pConn, sqlStr); //tmq_conf_set(conf, "td.connect.port", "6030");
if (taos_errno(pRes) != 0) { tmq_conf_set(conf, "td.connect.user", "root");
printf("error in use db, reason:%s\n", taos_errstr(pRes)); tmq_conf_set(conf, "td.connect.pass", "taosdata");
taos_free_result(pRes);
exit(-1);
}
taos_free_result(pRes);
tmq_conf_t* conf = tmq_conf_new(); tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
// tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set_offset_commit_cb(conf, tmq_commit_cb_print);
// tmq_conf_set(conf, "group.id", "cgrp1");
for (int32_t i = 0; i < pInfo->numOfKey; i++) { for (int32_t i = 0; i < pInfo->numOfKey; i++) {
tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]); tmq_conf_set(conf, pInfo->key[i], pInfo->value[i]);
} }
//tmq_conf_set(conf, "client.id", "c-001");
//tmq_conf_set(conf, "enable.auto.commit", "true");
//tmq_conf_set(conf, "enable.auto.commit", "false");
//tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
//tmq_conf_set(conf, "auto.offset.reset", "none");
//tmq_conf_set(conf, "auto.offset.reset", "earliest");
//tmq_conf_set(conf, "auto.offset.reset", "latest");
pInfo->tmq = tmq_consumer_new(conf, NULL, 0); pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
return; return;
} }
void build_topic_list(SThreadInfo* pInfo) { void build_topic_list(SThreadInfo *pInfo) {
pInfo->topicList = tmq_list_new(); pInfo->topicList = tmq_list_new();
// tmq_list_append(topic_list, "test_stb_topic_1"); // tmq_list_append(topic_list, "test_stb_topic_1");
for (int32_t i = 0; i < pInfo->numOfTopic; i++) { for (int32_t i = 0; i < pInfo->numOfTopic; i++) {
...@@ -246,45 +287,49 @@ void build_topic_list(SThreadInfo* pInfo) { ...@@ -246,45 +287,49 @@ void build_topic_list(SThreadInfo* pInfo) {
return; return;
} }
int32_t saveConsumeResult(SThreadInfo* pInfo) { int32_t saveConsumeResult(SThreadInfo *pInfo) {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %d)", g_stConfInfo.dbName, sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)",
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->checkresult); g_stConfInfo.cdbName,
pInfo->consumerId,
pInfo->consumeMsgCnt,
pInfo->consumeRowCnt,
pInfo->checkresult);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
return 0; return 0;
} }
void loop_consume(SThreadInfo* pInfo) { void loop_consume(SThreadInfo *pInfo) {
tmq_resp_err_t err; tmq_resp_err_t err;
int64_t totalMsgs = 0; int64_t totalMsgs = 0;
// int64_t totalRows = 0; int64_t totalRows = 0;
while (running) { while (running) {
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000); TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
if (tmqMsg) { if (tmqMsg) {
if (0 != g_stConfInfo.showMsgFlag) { if (0 != g_stConfInfo.showMsgFlag) {
msg_process(tmqMsg, totalMsgs, 0); totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
} }
taos_free_result(tmqMsg); taos_free_result(tmqMsg);
totalMsgs++; totalMsgs++;
if (totalMsgs >= pInfo->expectMsgCnt) { if (totalMsgs >= pInfo->expectMsgCnt) {
break; break;
} }
...@@ -292,7 +337,7 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -292,7 +337,7 @@ void loop_consume(SThreadInfo* pInfo) {
break; break;
} }
} }
err = tmq_consumer_close(pInfo->tmq); err = tmq_consumer_close(pInfo->tmq);
if (err) { if (err) {
printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
...@@ -300,34 +345,38 @@ void loop_consume(SThreadInfo* pInfo) { ...@@ -300,34 +345,38 @@ void loop_consume(SThreadInfo* pInfo) {
} }
pInfo->consumeMsgCnt = totalMsgs; pInfo->consumeMsgCnt = totalMsgs;
pInfo->consumeRowCnt = totalRows;
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %"PRId64", consumeRowCnt: %"PRId64"\n", pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
} }
void* consumeThreadFunc(void* param) { void *consumeThreadFunc(void *param) {
int32_t totalMsgs = 0; int32_t totalMsgs = 0;
SThreadInfo* pInfo = (SThreadInfo*)param; SThreadInfo *pInfo = (SThreadInfo *)param;
build_consumer(pInfo); build_consumer(pInfo);
build_topic_list(pInfo); build_topic_list(pInfo);
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)){
return NULL; return NULL;
} }
tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
if (err) { if (err) {
printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1); exit(-1);
} }
loop_consume(pInfo); loop_consume(pInfo);
err = tmq_unsubscribe(pInfo->tmq); err = tmq_unsubscribe(pInfo->tmq);
if (err) { if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
pInfo->consumeMsgCnt = -1; pInfo->consumeMsgCnt = -1;
return NULL; return NULL;
} }
// save consume result into consumeresult table // save consume result into consumeresult table
saveConsumeResult(pInfo); saveConsumeResult(pInfo);
...@@ -339,7 +388,7 @@ void parseConsumeInfo() { ...@@ -339,7 +388,7 @@ void parseConsumeInfo() {
const char delim[2] = ","; const char delim[2] = ",";
const char ch = ':'; const char ch = ':';
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
token = strtok(g_stConfInfo.stThreads[i].topicString, delim); token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -347,10 +396,10 @@ void parseConsumeInfo() { ...@@ -347,10 +396,10 @@ void parseConsumeInfo() {
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]); ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
g_stConfInfo.stThreads[i].numOfTopic++; g_stConfInfo.stThreads[i].numOfTopic++;
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
token = strtok(g_stConfInfo.stThreads[i].keyString, delim); token = strtok(g_stConfInfo.stThreads[i].keyString, delim);
while (token != NULL) { while (token != NULL) {
// printf("%s\n", token ); // printf("%s\n", token );
...@@ -364,7 +413,7 @@ void parseConsumeInfo() { ...@@ -364,7 +413,7 @@ void parseConsumeInfo() {
// g_stConfInfo.value[g_stConfInfo.numOfKey]); // g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++; g_stConfInfo.stThreads[i].numOfKey++;
} }
token = strtok(NULL, delim); token = strtok(NULL, delim);
} }
} }
...@@ -372,47 +421,48 @@ void parseConsumeInfo() { ...@@ -372,47 +421,48 @@ void parseConsumeInfo() {
int32_t getConsumeInfo() { int32_t getConsumeInfo() {
char sqlStr[1024] = {0}; char sqlStr[1024] = {0};
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
assert(pConn != NULL); assert(pConn != NULL);
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.dbName); sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes)); printf("error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosFprintfFile(g_fp, "error in get consumeinfo, reason:%s\n", taos_errstr(pRes));
taosCloseFile(&g_fp);
taos_free_result(pRes); taos_free_result(pRes);
exit(-1); exit(-1);
} }
TAOS_ROW row = NULL; TAOS_ROW row = NULL;
int num_fields = taos_num_fields(pRes); int num_fields = taos_num_fields(pRes);
TAOS_FIELD* fields = taos_fetch_fields(pRes); TAOS_FIELD* fields = taos_fetch_fields(pRes);
// schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, // schema: ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int
// ifcheckdata int
int32_t numOfThread = 0; int32_t numOfThread = 0;
while ((row = taos_fetch_row(pRes))) { while ((row = taos_fetch_row(pRes))) {
int32_t* lengths = taos_fetch_lengths(pRes); int32_t* lengths = taos_fetch_lengths(pRes);
for (int i = 0; i < num_fields; ++i) { for (int i = 0; i < num_fields; ++i) {
if (row[i] == NULL || 0 == i) { if (row[i] == NULL || 0 == i) {
continue; continue;
} }
if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { if ((1 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t*)row[i]); g_stConfInfo.stThreads[numOfThread].consumerId = *((int32_t *)row[i]);
} else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((2 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].topicString, row[i], lengths[i]);
} else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { } else if ((3 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]); memcpy(g_stConfInfo.stThreads[numOfThread].keyString, row[i], lengths[i]);
} else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) { } else if ((4 == i) && (fields[i].type == TSDB_DATA_TYPE_BIGINT)) {
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t *)row[i]);
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t *)row[i]);
} }
} }
numOfThread++; numOfThread ++;
} }
g_stConfInfo.numOfThread = numOfThread; g_stConfInfo.numOfThread = numOfThread;
...@@ -423,10 +473,11 @@ int32_t getConsumeInfo() { ...@@ -423,10 +473,11 @@ int32_t getConsumeInfo() {
return 0; return 0;
} }
int main(int32_t argc, char* argv[]) { int main(int32_t argc, char* argv[]) {
parseArgument(argc, argv); parseArgument(argc, argv);
getConsumeInfo(); getConsumeInfo();
initLogFile(); saveConfigToLogFile();
TdThreadAttr thattr; TdThreadAttr thattr;
taosThreadAttrInit(&thattr); taosThreadAttrInit(&thattr);
...@@ -434,19 +485,18 @@ int main(int32_t argc, char* argv[]) { ...@@ -434,19 +485,18 @@ int main(int32_t argc, char* argv[]) {
// pthread_create one thread to consume // pthread_create one thread to consume
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, consumeThreadFunc, (void *)(&(g_stConfInfo.stThreads[i])));
(void*)(&(g_stConfInfo.stThreads[i])));
} }
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
} }
// printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt); //printf("consumer: %d, cosumer1: %d\n", totalMsgs, pInfo->consumeMsgCnt);
taosFprintfFile(g_fp, "\n"); taosFprintfFile(g_fp, "\n");
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
return 0; return 0;
} }
...@@ -224,27 +224,63 @@ int32_t shellRunCommand(TAOS *con, char *command) { ...@@ -224,27 +224,63 @@ int32_t shellRunCommand(TAOS *con, char *command) {
} }
} }
char quote = 0, *cmd = command; bool esc = false;
char quote = 0, *cmd = command, *p = command;
for (char c = *command++; c != 0; c = *command++) { for (char c = *command++; c != 0; c = *command++) {
if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) { if (esc) {
command ++; switch (c) {
case 'n':
c = '\n';
break;
case 'r':
c = '\r';
break;
case 't':
c = '\t';
break;
case 'G':
*p++ = '\\';
break;
case '\'':
case '"':
if (quote) {
*p++ = '\\';
}
break;
}
*p++ = c;
esc = false;
continue; continue;
} }
if (c == '\\') {
if (quote != 0 && (*command == '_' || *command == '\\')) {
// DO nothing
} else {
esc = true;
continue;
}
}
if (quote == c) { if (quote == c) {
quote = 0; quote = 0;
} else if (quote == 0 && (c == '\'' || c == '"' || c == '`')) { } else if (quote == 0 && (c == '\'' || c == '"')) {
quote = c; quote = c;
} else if (c == ';' && quote == 0) { }
c = *command;
*command = 0; *p++ = c;
if (c == ';' && quote == 0) {
c = *p;
*p = 0;
if (shellRunSingleCommand(con, cmd) < 0) { if (shellRunSingleCommand(con, cmd) < 0) {
return -1; return -1;
} }
*command = c; *p = c;
cmd = command; p = cmd;
} }
} }
*p = 0;
return shellRunSingleCommand(con, cmd); return shellRunSingleCommand(con, cmd);
} }
...@@ -538,23 +574,19 @@ static void shellPrintNChar(const char *str, int length, int width) { ...@@ -538,23 +574,19 @@ static void shellPrintNChar(const char *str, int length, int width) {
while (pos < length) { while (pos < length) {
TdWchar wc; TdWchar wc;
int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
if (bytes <= 0) { if (bytes == 0) {
break; break;
} }
if (pos + bytes > length) { pos += bytes;
if (pos > length) {
break; break;
} }
int w = 0;
#ifdef WINDOWS #ifdef WINDOWS
w = bytes; int w = bytes;
#else #else
if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){ int w = taosWcharWidth(wc);
w = bytes;
}else{
w = taosWcharWidth(wc);
}
#endif #endif
pos += bytes;
if (w <= 0) { if (w <= 0) {
continue; continue;
} }
...@@ -648,7 +680,6 @@ static void printField(const char *val, TAOS_FIELD *field, int width, int32_t le ...@@ -648,7 +680,6 @@ static void printField(const char *val, TAOS_FIELD *field, int width, int32_t le
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
shellPrintNChar(val, length, width); shellPrintNChar(val, length, width);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
...@@ -761,8 +792,7 @@ static int calcColWidth(TAOS_FIELD *field, int precision) { ...@@ -761,8 +792,7 @@ static int calcColWidth(TAOS_FIELD *field, int precision) {
return TMAX(field->bytes, width); return TMAX(field->bytes, width);
} }
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_JSON:{
int16_t bytes = field->bytes * TSDB_NCHAR_SIZE; int16_t bytes = field->bytes * TSDB_NCHAR_SIZE;
if (bytes > tsMaxBinaryDisplayWidth) { if (bytes > tsMaxBinaryDisplayWidth) {
return TMAX(tsMaxBinaryDisplayWidth, width); return TMAX(tsMaxBinaryDisplayWidth, width);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册