提交 44c5e42f 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/vnode_refact1

...@@ -88,12 +88,6 @@ def pre_test(){ ...@@ -88,12 +88,6 @@ def pre_test(){
cmake .. > /dev/null cmake .. > /dev/null
make -j4> /dev/null make -j4> /dev/null
''' '''
sh'''
cd ${WKPY}
git reset --hard
git pull
pip3 install .
'''
return 1 return 1
} }
...@@ -103,7 +97,6 @@ pipeline { ...@@ -103,7 +97,6 @@ pipeline {
environment{ environment{
WK = '/var/lib/jenkins/workspace/TDinternal' WK = '/var/lib/jenkins/workspace/TDinternal'
WKC= '/var/lib/jenkins/workspace/TDengine' WKC= '/var/lib/jenkins/workspace/TDengine'
WKPY= '/var/lib/jenkins/workspace/taos-connector-python'
} }
stages { stages {
stage('pre_build'){ stage('pre_build'){
...@@ -124,11 +117,6 @@ pipeline { ...@@ -124,11 +117,6 @@ pipeline {
./test-all.sh b1fq ./test-all.sh b1fq
''' '''
sh''' sh'''
export LD_LIBRARY_PATH=${WKC}/debug/build/lib
cd ${WKC}/tests/system-test
./fulltest.sh
'''
sh'''
cd ${WKC}/debug cd ${WKC}/debug
ctest ctest
''' '''
......
...@@ -326,6 +326,13 @@ int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp); ...@@ -326,6 +326,13 @@ int32_t tDecodeSEpSet(SCoder* pDecoder, SEpSet* pEp);
int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp); int32_t taosEncodeSEpSet(void** buf, const SEpSet* pEp);
void* taosDecodeSEpSet(const void* buf, SEpSet* pEp); void* taosDecodeSEpSet(const void* buf, SEpSet* pEp);
typedef struct {
SEpSet epSet;
} SMEpSet;
int32_t tSerializeSMEpSet(void* buf, int32_t bufLen, SMEpSet* pReq);
int32_t tDeserializeSMEpSet(void* buf, int32_t buflen, SMEpSet* pReq);
typedef struct { typedef struct {
int8_t connType; int8_t connType;
int32_t pid; int32_t pid;
...@@ -984,7 +991,6 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp); ...@@ -984,7 +991,6 @@ int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
void tFreeSShowRsp(SShowRsp* pRsp); void tFreeSShowRsp(SShowRsp* pRsp);
typedef struct { typedef struct {
int32_t type;
char db[TSDB_DB_FNAME_LEN]; char db[TSDB_DB_FNAME_LEN];
char tb[TSDB_TABLE_NAME_LEN]; char tb[TSDB_TABLE_NAME_LEN];
int64_t showId; int64_t showId;
...@@ -2753,6 +2759,7 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p ...@@ -2753,6 +2759,7 @@ static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* p
} }
return buf; return buf;
} }
#pragma pack(pop) #pragma pack(pop)
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -166,6 +166,7 @@ typedef struct SInputColumnInfoData { ...@@ -166,6 +166,7 @@ typedef struct SInputColumnInfoData {
SColumnInfoData *pPTS; // primary timestamp column SColumnInfoData *pPTS; // primary timestamp column
SColumnInfoData **pData; SColumnInfoData **pData;
SColumnDataAgg **pColumnDataAgg; SColumnDataAgg **pColumnDataAgg;
uint64_t uid; // table uid
} SInputColumnInfoData; } SInputColumnInfoData;
// sql function runtime context // sql function runtime context
...@@ -191,7 +192,7 @@ typedef struct SqlFunctionCtx { ...@@ -191,7 +192,7 @@ typedef struct SqlFunctionCtx {
int16_t functionId; // function id int16_t functionId; // function id
char * pOutput; // final result output buffer, point to sdata->data char * pOutput; // final result output buffer, point to sdata->data
int32_t numOfParams; int32_t numOfParams;
SVariant param[4]; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param SFunctParam *param; // input parameter, e.g., top(k, 20), the number of results for top query is kept in param
int64_t *ptsList; // corresponding timestamp array list int64_t *ptsList; // corresponding timestamp array list
SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/ SColumnInfoData *pTsOutput; // corresponding output buffer for timestamp of each result, e.g., top/bottom*/
int32_t offset; int32_t offset;
......
...@@ -54,12 +54,13 @@ typedef struct { ...@@ -54,12 +54,13 @@ typedef struct {
uint16_t clientPort; uint16_t clientPort;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
int32_t rspLen; int32_t rspLen;
void *pRsp; void * pRsp;
void *pNode; void * pNode;
} SNodeMsg; } SNodeMsg;
typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *); typedef void (*RpcCfp)(void *parent, SRpcMsg *, SEpSet *);
typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey); typedef int (*RpcAfp)(void *parent, char *tableId, char *spi, char *encrypt, char *secret, char *ckey);
typedef int (*RpcRfp)(void *parent, SRpcMsg *, SEpSet *);
typedef struct SRpcInit { typedef struct SRpcInit {
uint16_t localPort; // local port uint16_t localPort; // local port
...@@ -80,22 +81,25 @@ typedef struct SRpcInit { ...@@ -80,22 +81,25 @@ typedef struct SRpcInit {
RpcCfp cfp; RpcCfp cfp;
// call back to retrieve the client auth info, for server app only // call back to retrieve the client auth info, for server app only
RpcAfp afp;; RpcAfp afp;
// user defined retry func
RpcRfp rfp;
void *parent; void *parent;
} SRpcInit; } SRpcInit;
typedef struct { typedef struct {
void *val; void *val;
int32_t (*clone)(void *src, void **dst); int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg); void (*freeFunc)(const void *arg);
} SRpcCtxVal; } SRpcCtxVal;
typedef struct { typedef struct {
int32_t msgType; int32_t msgType;
void *val; void * val;
int32_t (*clone)(void *src, void **dst); int32_t (*clone)(void *src, void **dst);
void (*freeFunc)(const void *arg); void (*freeFunc)(const void *arg);
} SRpcBrokenlinkVal; } SRpcBrokenlinkVal;
typedef struct { typedef struct {
......
...@@ -34,6 +34,7 @@ extern int64_t tsOpenMax; ...@@ -34,6 +34,7 @@ extern int64_t tsOpenMax;
extern int64_t tsStreamMax; extern int64_t tsStreamMax;
extern float tsNumOfCores; extern float tsNumOfCores;
extern int64_t tsTotalMemoryKB; extern int64_t tsTotalMemoryKB;
extern char* tsProcPath;
extern char configDir[]; extern char configDir[];
extern char tsDataDir[]; extern char tsDataDir[];
......
...@@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size ...@@ -82,7 +82,7 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
* @return * @return
*/ */
void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar,
__ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot); __ext_compar_fn_t compar, char* buf, bool maxroot);
/** /**
* sort heap to make sure it is a max/min root heap * sort heap to make sure it is a max/min root heap
...@@ -98,7 +98,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const ...@@ -98,7 +98,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
* @return * @return
*/ */
void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar,
const void *parswap, __ext_swap_fn_t swap, bool maxroot); bool maxroot);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) { ...@@ -662,7 +662,7 @@ TEST(testCase, agg_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "select count(*) from tu"); pRes = taos_query(pConn, "select now() from m1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to select from table, reason:%s\n", taos_errstr(pRes)); printf("failed to select from table, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes); taos_free_result(pRes);
......
...@@ -892,6 +892,27 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) { ...@@ -892,6 +892,27 @@ void tFreeSMAltertbReq(SMAltertbReq *pReq) {
taosArrayDestroy(pReq->pFields); taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL; pReq->pFields = NULL;
} }
int32_t tSerializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
SCoder encoder = {0};
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeSEpSet(&encoder, &pReq->epSet) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tCoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSMEpSet(void *buf, int32_t bufLen, SMEpSet *pReq) {
SCoder decoder = {0};
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSEpSet(&decoder, &pReq->epSet) < 0) return -1;
tEndDecode(&decoder);
tCoderClear(&decoder);
return 0;
}
int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) { int32_t tSerializeSMCreateSmaReq(void *buf, int32_t bufLen, SMCreateSmaReq *pReq) {
SCoder encoder = {0}; SCoder encoder = {0};
...@@ -2489,7 +2510,6 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq ...@@ -2489,7 +2510,6 @@ int32_t tSerializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableReq
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI64(&encoder, pReq->showId) < 0) return -1; if (tEncodeI64(&encoder, pReq->showId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->type) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1; if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1; if (tEncodeCStr(&encoder, pReq->tb) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
...@@ -2505,7 +2525,6 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR ...@@ -2505,7 +2525,6 @@ int32_t tDeserializeSRetrieveTableReq(void *buf, int32_t bufLen, SRetrieveTableR
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1; if (tDecodeI64(&decoder, &pReq->showId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->type) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1; if (tDecodeCStrTo(&decoder, pReq->tb) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
......
...@@ -216,6 +216,126 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) { ...@@ -216,6 +216,126 @@ static void dmStopMgmt(SMgmtWrapper *pWrapper) {
dmStopStatusThread(pWrapper->pDnode); dmStopStatusThread(pWrapper->pDnode);
} }
static int32_t dmSpawnUdfd(SDnode *pDnode);
void dmUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) {
dInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal);
uv_close((uv_handle_t*)process, NULL);
SDnode *pDnode = process->data;
SUdfdData *pData = &pDnode->udfdData;
if (atomic_load_8(&pData->stopping) != 0) {
dDebug("udfd process exit due to stopping");
} else {
uv_close((uv_handle_t*)&pData->ctrlPipe, NULL);
dmSpawnUdfd(pDnode);
}
}
static int32_t dmSpawnUdfd(SDnode *pDnode) {
dInfo("dnode start spawning udfd");
uv_process_options_t options = {0};
char path[PATH_MAX] = {0};
if (tsProcPath == NULL) {
path[0] = '.';
} else {
strncpy(path, tsProcPath, strlen(tsProcPath));
taosDirName(path);
}
strcat(path, "/udfd");
char* argsUdfd[] = {path, "-c", configDir, NULL};
options.args = argsUdfd;
options.file = path;
options.exit_cb = dmUdfdExit;
SUdfdData *pData = &pDnode->udfdData;
uv_pipe_init(&pData->loop, &pData->ctrlPipe, 1);
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe;
child_stdio[1].flags = UV_IGNORE;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio_count = 3;
options.stdio = child_stdio;
char dnodeIdEnvItem[32] = {0};
char thrdPoolSizeEnvItem[32] = {0};
snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pDnode->data.dnodeId);
float numCpuCores = 4;
taosGetCpuCores(&numCpuCores);
snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2);
char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL};
options.env = envUdfd;
int err = uv_spawn(&pData->loop, &pData->process, &options);
pData->process.data = (void*)pDnode;
if (err != 0) {
dError("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
return err;
}
static void dmUdfdCloseWalkCb(uv_handle_t* handle, void* arg) {
if (!uv_is_closing(handle)) {
uv_close(handle, NULL);
}
}
void dmWatchUdfd(void *args) {
SDnode *pDnode = args;
SUdfdData *pData = &pDnode->udfdData;
uv_loop_init(&pData->loop);
int32_t err = dmSpawnUdfd(pDnode);
atomic_store_32(&pData->spawnErr, err);
uv_barrier_wait(&pData->barrier);
uv_run(&pData->loop, UV_RUN_DEFAULT);
err = uv_loop_close(&pData->loop);
while (err == UV_EBUSY) {
uv_walk(&pData->loop, dmUdfdCloseWalkCb, NULL);
uv_run(&pData->loop, UV_RUN_DEFAULT);
err = uv_loop_close(&pData->loop);
}
return;
}
int32_t dmStartUdfd(SDnode *pDnode) {
SUdfdData *pData = &pDnode->udfdData;
if (pData->startCalled) {
dInfo("dnode-mgmt start udfd already called");
return 0;
}
uv_barrier_init(&pData->barrier, 2);
pData->stopping = 0;
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
uv_barrier_wait(&pData->barrier);
pData->startCalled = true;
pData->needCleanUp = true;
return pData->spawnErr;
}
int32_t dmStopUdfd(SDnode *pDnode) {
dInfo("dnode-mgmt to stop udfd. need cleanup: %d, spawn err: %d",
pDnode->udfdData.needCleanUp, pDnode->udfdData.spawnErr);
SUdfdData *pData = &pDnode->udfdData;
if (!pData->needCleanUp) {
return 0;
}
atomic_store_8(&pData->stopping, 1);
uv_barrier_destroy(&pData->barrier);
if (pData->spawnErr == 0) {
uv_process_kill(&pData->process, SIGINT);
}
uv_stop(&pData->loop);
uv_thread_join(&pData->thread);
atomic_store_8(&pData->stopping, 0);
return 0;
}
static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to init"); dInfo("dnode-mgmt start to init");
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
...@@ -247,6 +367,10 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { ...@@ -247,6 +367,10 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
} }
dmReportStartup(pDnode, "dnode-transport", "initialized"); dmReportStartup(pDnode, "dnode-transport", "initialized");
if (dmStartUdfd(pDnode) != 0) {
dError("failed to start udfd");
}
dInfo("dnode-mgmt is initialized"); dInfo("dnode-mgmt is initialized");
return 0; return 0;
} }
...@@ -254,6 +378,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) { ...@@ -254,6 +378,7 @@ static int32_t dmInitMgmt(SMgmtWrapper *pWrapper) {
static void dmCleanupMgmt(SMgmtWrapper *pWrapper) { static void dmCleanupMgmt(SMgmtWrapper *pWrapper) {
dInfo("dnode-mgmt start to clean up"); dInfo("dnode-mgmt start to clean up");
SDnode *pDnode = pWrapper->pDnode; SDnode *pDnode = pWrapper->pDnode;
dmStopUdfd(pDnode);
dmStopWorker(pDnode); dmStopWorker(pDnode);
taosWLockLatch(&pDnode->data.latch); taosWLockLatch(&pDnode->data.latch);
......
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
#ifndef _TD_DM_DEF_H_ #ifndef _TD_DM_DEF_H_
#define _TD_DM_DEF_H_ #define _TD_DM_DEF_H_
#include "uv.h"
#include "dmLog.h" #include "dmLog.h"
#include "cJSON.h" #include "cJSON.h"
...@@ -142,6 +143,18 @@ typedef struct { ...@@ -142,6 +143,18 @@ typedef struct {
char desc[TSDB_STEP_DESC_LEN]; char desc[TSDB_STEP_DESC_LEN];
} SStartupInfo; } SStartupInfo;
typedef struct SUdfdData {
bool startCalled;
bool needCleanUp;
uv_loop_t loop;
uv_thread_t thread;
uv_barrier_t barrier;
uv_process_t process;
int spawnErr;
int8_t stopping;
uv_pipe_t ctrlPipe;
} SUdfdData;
typedef struct SDnode { typedef struct SDnode {
EDndProcType ptype; EDndProcType ptype;
EDndNodeType ntype; EDndNodeType ntype;
...@@ -150,6 +163,7 @@ typedef struct SDnode { ...@@ -150,6 +163,7 @@ typedef struct SDnode {
SStartupInfo startup; SStartupInfo startup;
SDnodeTrans trans; SDnodeTrans trans;
SDnodeData data; SDnodeData data;
SUdfdData udfdData;
TdThreadMutex mutex; TdThreadMutex mutex;
SMgmtWrapper wrappers[NODE_END]; SMgmtWrapper wrappers[NODE_END];
} SDnode; } SDnode;
......
...@@ -89,7 +89,6 @@ int32_t Testbase::SendShowReq(int8_t showType, const char *tb, const char* db) { ...@@ -89,7 +89,6 @@ int32_t Testbase::SendShowReq(int8_t showType, const char *tb, const char* db) {
} }
SRetrieveTableReq retrieveReq = {0}; SRetrieveTableReq retrieveReq = {0};
retrieveReq.type = showType;
strcpy(retrieveReq.db, db); strcpy(retrieveReq.db, db);
strcpy(retrieveReq.tb, tb); strcpy(retrieveReq.tb, tb);
......
...@@ -391,7 +391,6 @@ typedef struct { ...@@ -391,7 +391,6 @@ typedef struct {
int16_t numOfColumns; int16_t numOfColumns;
int32_t rowSize; int32_t rowSize;
int32_t numOfRows; int32_t numOfRows;
int32_t payloadLen;
void* pIter; void* pIter;
SMnode* pMnode; SMnode* pMnode;
STableMetaRsp* pMeta; STableMetaRsp* pMeta;
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#define SHOW_STEP_SIZE 100 #define SHOW_STEP_SIZE 100
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq); static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq);
static void mndFreeShowObj(SShowObj *pShow); static void mndFreeShowObj(SShowObj *pShow);
static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId); static SShowObj *mndAcquireShowObj(SMnode *pMnode, int64_t showId);
static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove); static void mndReleaseShowObj(SShowObj *pShow, bool forceRemove);
...@@ -47,18 +47,80 @@ void mndCleanupShow(SMnode *pMnode) { ...@@ -47,18 +47,80 @@ void mndCleanupShow(SMnode *pMnode) {
} }
} }
static SShowObj *mndCreateShowObj(SMnode *pMnode, SShowReq *pReq) { static int32_t convertToRetrieveType(char* name, int32_t len) {
int32_t type = -1;
if (strncasecmp(name, TSDB_INS_TABLE_DNODES, len) == 0) {
type = TSDB_MGMT_TABLE_DNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, len) == 0) {
type = TSDB_MGMT_TABLE_MNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, len) == 0) {
type = TSDB_MGMT_TABLE_MODULE;
} else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, len) == 0) {
type = TSDB_MGMT_TABLE_QNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, len) == 0) {
type = TSDB_MGMT_TABLE_BNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, len) == 0) {
type = TSDB_MGMT_TABLE_SNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, len) == 0) {
type = TSDB_MGMT_TABLE_CLUSTER;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, len) == 0) {
type = TSDB_MGMT_TABLE_DB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, len) == 0) {
type = TSDB_MGMT_TABLE_FUNC;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, len) == 0) {
// type = TSDB_MGMT_TABLE_INDEX;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, len) == 0) {
type = TSDB_MGMT_TABLE_STB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, len) == 0) {
type = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, len) == 0) {
type = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, len) == 0) {
// type = TSDB_MGMT_TABLE_DIST;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, len) == 0) {
type = TSDB_MGMT_TABLE_USER;
} else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, len) == 0) {
type = TSDB_MGMT_TABLE_GRANTS;
} else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, len) == 0) {
type = TSDB_MGMT_TABLE_VGROUP;
} else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, len) == 0) {
type = TSDB_MGMT_TABLE_TOPICS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, len) == 0) {
type = TSDB_MGMT_TABLE_CONSUMERS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, len) == 0) {
type = TSDB_MGMT_TABLE_SUBSCRIBES;
} else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, len) == 0) {
type = TSDB_MGMT_TABLE_TRANS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, len) == 0) {
type = TSDB_MGMT_TABLE_SMAS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, len) == 0) {
type = TSDB_MGMT_TABLE_CONFIGS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, len) == 0) {
type = TSDB_MGMT_TABLE_CONNS;
} else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, len) == 0) {
type = TSDB_MGMT_TABLE_QUERIES;
} else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, len) == 0) {
type = TSDB_MGMT_TABLE_VNODES;
} else {
// ASSERT(0);
}
return type;
}
static SShowObj *mndCreateShowObj(SMnode *pMnode, SRetrieveTableReq *pReq) {
SShowMgmt *pMgmt = &pMnode->showMgmt; SShowMgmt *pMgmt = &pMnode->showMgmt;
int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1); int64_t showId = atomic_add_fetch_64(&pMgmt->showId, 1);
if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1); if (showId == 0) atomic_add_fetch_64(&pMgmt->showId, 1);
int32_t size = sizeof(SShowObj) + pReq->payloadLen; int32_t size = sizeof(SShowObj);
SShowObj showObj = {0}; SShowObj showObj = {0};
showObj.id = showId; showObj.id = showId;
showObj.pMnode = pMnode; showObj.pMnode = pMnode;
showObj.type = pReq->type; showObj.type = convertToRetrieveType(pReq->tb, tListLen(pReq->tb));
showObj.payloadLen = pReq->payloadLen;
memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN); memcpy(showObj.db, pReq->db, TSDB_DB_FNAME_LEN);
int32_t keepTime = tsShellActivityTimer * 6 * 1000; int32_t keepTime = tsShellActivityTimer * 6 * 1000;
...@@ -127,10 +189,6 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { ...@@ -127,10 +189,6 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
} }
if (retrieveReq.showId == 0) { if (retrieveReq.showId == 0) {
SShowReq req = {0};
req.type = retrieveReq.type;
strncpy(req.db, retrieveReq.db, tListLen(req.db));
STableMetaRsp *pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb) + 1); STableMetaRsp *pMeta = (STableMetaRsp *)taosHashGet(pMnode->infosMeta, retrieveReq.tb, strlen(retrieveReq.tb) + 1);
if (pMeta == NULL) { if (pMeta == NULL) {
terrno = TSDB_CODE_MND_INVALID_INFOS_TBL; terrno = TSDB_CODE_MND_INVALID_INFOS_TBL;
...@@ -138,7 +196,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) { ...@@ -138,7 +196,7 @@ static int32_t mndProcessRetrieveSysTableReq(SNodeMsg *pReq) {
return -1; return -1;
} }
pShow = mndCreateShowObj(pMnode, &req); pShow = mndCreateShowObj(pMnode, &retrieveReq);
if (pShow == NULL) { if (pShow == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to process show-meta req since %s", terrstr()); mError("failed to process show-meta req since %s", terrstr());
......
...@@ -36,7 +36,7 @@ TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) { ...@@ -36,7 +36,7 @@ TEST_F(MndTestShow, 01_ShowMsg_InvalidMsgMax) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); ASSERT_NE(pRsp->code, 0);
} }
TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
...@@ -50,7 +50,7 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) { ...@@ -50,7 +50,7 @@ TEST_F(MndTestShow, 02_ShowMsg_InvalidMsgStart) {
SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen); SRpcMsg* pRsp = test.SendReq(TDMT_MND_SYSTABLE_RETRIEVE, pReq, contLen);
ASSERT_NE(pRsp, nullptr); ASSERT_NE(pRsp, nullptr);
ASSERT_EQ(pRsp->code, TSDB_CODE_INVALID_MSG); ASSERT_NE(pRsp->code, 0);
} }
TEST_F(MndTestShow, 03_ShowMsg_Conn) { TEST_F(MndTestShow, 03_ShowMsg_Conn) {
......
...@@ -382,7 +382,7 @@ typedef struct SSysTableScanInfo { ...@@ -382,7 +382,7 @@ typedef struct SSysTableScanInfo {
void* pCur; // cursor for iterate the local table meta store. void* pCur; // cursor for iterate the local table meta store.
SArray* scanCols; // SArray<int16_t> scan column id list SArray* scanCols; // SArray<int16_t> scan column id list
int32_t type; // show type, TODO remove it // int32_t type; // show type, TODO remove it
SName name; SName name;
SSDataBlock* pRes; SSDataBlock* pRes;
int32_t capacity; int32_t capacity;
...@@ -479,7 +479,7 @@ typedef struct { ...@@ -479,7 +479,7 @@ typedef struct {
typedef struct SGroupbyOperatorInfo { typedef struct SGroupbyOperatorInfo {
SOptrBasicInfo binfo; SOptrBasicInfo binfo;
SArray* pGroupCols; SArray* pGroupCols; // group by columns, SArray<SColumn>
SArray* pGroupColVals; // current group column values, SArray<SGroupKeys> SArray* pGroupColVals; // current group column values, SArray<SGroupKeys>
SNode* pCondition; SNode* pCondition;
bool isInit; // denote if current val is initialized or not bool isInit; // denote if current val is initialized or not
...@@ -600,7 +600,6 @@ typedef struct SJoinOperatorInfo { ...@@ -600,7 +600,6 @@ typedef struct SJoinOperatorInfo {
int32_t rightPos; int32_t rightPos;
SColumnInfo rightCol; SColumnInfo rightCol;
SNode *pOnCondition; SNode *pOnCondition;
// SRspResultInfo resultInfo;
} SJoinOperatorInfo; } SJoinOperatorInfo;
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
......
...@@ -1087,6 +1087,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -1087,6 +1087,7 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pCtx[i].currentStage = MAIN_SCAN; pCtx[i].currentStage = MAIN_SCAN;
SInputColumnInfoData* pInput = &pCtx[i].input; SInputColumnInfoData* pInput = &pCtx[i].input;
pInput->uid = pBlock->info.uid;
SExprInfo* pOneExpr = &pOperator->pExpr[i]; SExprInfo* pOneExpr = &pOperator->pExpr[i];
for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) { for (int32_t j = 0; j < pOneExpr->base.numOfParams; ++j) {
...@@ -1101,7 +1102,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt ...@@ -1101,7 +1102,9 @@ static int32_t doSetInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCt
pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column pInput->pPTS = taosArrayGet(pBlock->pDataBlock, 0); // todo set the correct timestamp column
ASSERT(pInput->pData[j] != NULL); ASSERT(pInput->pData[j] != NULL);
} else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) { } else if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
if (createDummyCol) { // todo avoid case: top(k, 12), 12 is the value parameter.
// sum(11), 11 is also the value parameter.
if (createDummyCol && pOneExpr->base.numOfParams == 1) {
code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows); code = doCreateConstantValColumnInfo(pInput, pFuncParam, pFuncParam->param.nType, j, pBlock->info.rows);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
...@@ -1876,67 +1879,58 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, ...@@ -1876,67 +1879,58 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
} }
} }
pCtx->resDataInfo.interBufSize = env.calcMemSize; pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR || pExpr->pExpr->nodeType == QUERY_NODE_VALUE) { } else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN || pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR ||
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes; // for simple column, the intermediate buffer needs to hold one element. pExpr->pExpr->nodeType == QUERY_NODE_VALUE) {
// for simple column, the intermediate buffer needs to hold one element.
pCtx->resDataInfo.interBufSize = pFunct->resSchema.bytes;
} }
pCtx->input.numOfInputCols = pFunct->numOfParams; pCtx->input.numOfInputCols = pFunct->numOfParams;
pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pData = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES); pCtx->input.pColumnDataAgg = taosMemoryCalloc(pFunct->numOfParams, POINTER_BYTES);
pCtx->pTsOutput = NULL; pCtx->pTsOutput = NULL;
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes; pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
pCtx->resDataInfo.type = pFunct->resSchema.type; pCtx->resDataInfo.type = pFunct->resSchema.type;
pCtx->order = TSDB_ORDER_ASC; pCtx->order = TSDB_ORDER_ASC;
pCtx->start.key = INT64_MIN; pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN; pCtx->end.key = INT64_MIN;
#if 0 pCtx->numOfParams = pExpr->base.numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// int16_t type = pFunct->param[j].nType; pCtx->param = pFunct->pParam;
// int16_t bytes = pFunct->param[j].nLen; // for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
// // set the order information for top/bottom query
// if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { // int32_t functionId = pCtx->functionId;
// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type); // if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
// } else { // int32_t f = getExprFunctionId(&pExpr[0]);
// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type); // assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
// } //
} // // pCtx->param[2].i = pQueryAttr->order.order;
// // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// set the order information for top/bottom query // // pCtx->param[3].i = functionId;
int32_t functionId = pCtx->functionId; // // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { //
int32_t f = getExprFunctionId(&pExpr[0]); // // pCtx->param[1].i = pQueryAttr->order.col.info.colId;
assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); // } else if (functionId == FUNCTION_INTERP) {
// // pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
// pCtx->param[2].i = pQueryAttr->order.order; // // if (pQueryAttr->fillVal != NULL) {
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; // // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
pCtx->param[3].i = functionId; // // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; // // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
// // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
// pCtx->param[1].i = pQueryAttr->order.col.info.colId; // // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType);
} else if (functionId == FUNCTION_INTERP) { // // }
// pCtx->param[2].i = (int8_t)pQueryAttr->fillType; // // }
// if (pQueryAttr->fillVal != NULL) { // // }
// if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { // } else if (functionId == FUNCTION_TWA) {
// pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; // // pCtx->param[1].i = pQueryAttr->window.skey;
// } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value // // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
// if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { // // pCtx->param[2].i = pQueryAttr->window.ekey;
// taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType); // // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
// } // } else if (functionId == FUNCTION_ARITHM) {
// } // // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
// } // }
} else if (functionId == FUNCTION_TS_COMP) { // }
// pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_TWA) {
// pCtx->param[1].i = pQueryAttr->window.skey;
pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
// pCtx->param[2].i = pQueryAttr->window.ekey;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_ARITHM) {
// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
}
#endif
} }
for (int32_t i = 1; i < numOfOutput; ++i) { for (int32_t i = 1; i < numOfOutput; ++i) {
...@@ -1955,7 +1949,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) { ...@@ -1955,7 +1949,7 @@ static void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) { for (int32_t j = 0; j < pCtx[i].numOfParams; ++j) {
taosVariantDestroy(&pCtx[i].param[j]); taosVariantDestroy(&pCtx[i].param[j].param);
} }
taosVariantDestroy(&pCtx[i].tag); taosVariantDestroy(&pCtx[i].tag);
...@@ -6487,9 +6481,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ...@@ -6487,9 +6481,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t numOfCols = 0; int32_t numOfCols = 0;
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId); tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
if (pDataReader == NULL) {
return NULL;
}
SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols); SArray* pColList = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
......
...@@ -319,7 +319,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -319,7 +319,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
// The read handle is not initialized yet, since no qualified tables exists // The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->dataReader == NULL) { if (pTableScanInfo->dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} }
...@@ -375,10 +375,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -375,10 +375,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
return p; return p;
} }
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, SOperatorInfo* createTableScanOperatorInfo(void* pDataReader, int32_t order, int32_t numOfOutput, int32_t dataLoadFlag,
int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock,
SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SNode* pCondition, SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
SInterval* pInterval, double sampleRatio, SExecTaskInfo* pTaskInfo) {
assert(repeatTime > 0); assert(repeatTime > 0);
STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo)); STableScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableScanInfo));
...@@ -391,19 +390,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, ...@@ -391,19 +390,19 @@ SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order,
return NULL; return NULL;
} }
pInfo->interval = *pInterval; pInfo->interval = *pInterval;
pInfo->sampleRatio = sampleRatio; pInfo->sampleRatio = sampleRatio;
pInfo->dataBlockLoadFlag = dataLoadFlag; pInfo->dataBlockLoadFlag= dataLoadFlag;
pInfo->pResBlock = pResBlock; pInfo->pResBlock = pResBlock;
pInfo->pFilterNode = pCondition; pInfo->pFilterNode = pCondition;
pInfo->dataReader = pTsdbReadHandle; pInfo->dataReader = pDataReader;
pInfo->times = repeatTime; pInfo->times = repeatTime;
pInfo->reverseTimes = reverseTime; pInfo->reverseTimes = reverseTime;
pInfo->order = order; pInfo->order = order;
pInfo->current = 0; pInfo->current = 0;
pInfo->scanFlag = MAIN_SCAN; pInfo->scanFlag = MAIN_SCAN;
pInfo->pColMatchInfo = pColMatchInfo; pInfo->pColMatchInfo = pColMatchInfo;
pOperator->name = "TableScanOperator"; pOperator->name = "TableScanOperator";
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
pOperator->blockingOptr = false; pOperator->blockingOptr = false;
pOperator->status = OP_NOT_OPENED; pOperator->status = OP_NOT_OPENED;
...@@ -677,7 +676,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) { ...@@ -677,7 +676,8 @@ static void destroySysScanOperator(void* param, int32_t numOfOutput) {
tsem_destroy(&pInfo->ready); tsem_destroy(&pInfo->ready);
blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pRes);
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
metaCloseTbCursor(pInfo->pCur); metaCloseTbCursor(pInfo->pCur);
} }
} }
...@@ -812,7 +812,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -812,7 +812,8 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
SSysTableScanInfo* pInfo = pOperator->info; SSysTableScanInfo* pInfo = pOperator->info;
// retrieve local table list info from vnode // retrieve local table list info from vnode
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { const char* name = tNameGetTableName(&pInfo->name);
if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
if (pInfo->pCur == NULL) { if (pInfo->pCur == NULL) {
pInfo->pCur = metaOpenTbCursor(pInfo->readHandle); pInfo->pCur = metaOpenTbCursor(pInfo->readHandle);
} }
...@@ -864,8 +865,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { ...@@ -864,8 +865,6 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
while (1) { while (1) {
int64_t startTs = taosGetTimestampUs(); int64_t startTs = taosGetTimestampUs();
pInfo->req.type = pInfo->type;
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb)); strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
if (pInfo->showRewrite) { if (pInfo->showRewrite) {
...@@ -947,68 +946,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB ...@@ -947,68 +946,9 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataB
pInfo->pCondition = pCondition; pInfo->pCondition = pCondition;
pInfo->scanCols = colList; pInfo->scanCols = colList;
// TODO remove it
int32_t tableType = 0;
const char* name = tNameGetTableName(pName);
if (strncasecmp(name, TSDB_INS_TABLE_DNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_DNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_MNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_MODULES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_MODULE;
} else if (strncasecmp(name, TSDB_INS_TABLE_QNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_QNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_BNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_BNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_SNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_SNODE;
} else if (strncasecmp(name, TSDB_INS_TABLE_CLUSTER, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CLUSTER;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_DATABASES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_DB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_FUNCTIONS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_FUNC;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_INDEXES, tListLen(pName->tname)) == 0) {
// tableType = TSDB_MGMT_TABLE_INDEX;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STABLES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_STB;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_STREAMS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_STREAMS;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_TABLE;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED, tListLen(pName->tname)) == 0) {
// tableType = TSDB_MGMT_TABLE_DIST;
} else if (strncasecmp(name, TSDB_INS_TABLE_USER_USERS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_USER;
} else if (strncasecmp(name, TSDB_INS_TABLE_LICENCES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_GRANTS;
} else if (strncasecmp(name, TSDB_INS_TABLE_VGROUPS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_VGROUP;
} else if (strncasecmp(name, TSDB_INS_TABLE_TOPICS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_TOPICS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONSUMERS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CONSUMERS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SUBSCRIBES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_SUBSCRIBES;
} else if (strncasecmp(name, TSDB_INS_TABLE_TRANS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_TRANS;
} else if (strncasecmp(name, TSDB_INS_TABLE_SMAS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_SMAS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONFIGS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CONFIGS;
} else if (strncasecmp(name, TSDB_INS_TABLE_CONNS, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_CONNS;
} else if (strncasecmp(name, TSDB_INS_TABLE_QUERIES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_QUERIES;
} else if (strncasecmp(name, TSDB_INS_TABLE_VNODES, tListLen(pName->tname)) == 0) {
tableType = TSDB_MGMT_TABLE_VNODES;
} else {
ASSERT(0);
}
tNameAssign(&pInfo->name, pName); tNameAssign(&pInfo->name, pName);
pInfo->type = tableType; const char* name = tNameGetTableName(&pInfo->name);
if (pInfo->type == TSDB_MGMT_TABLE_TABLE) { if (strncasecmp(name, TSDB_INS_TABLE_USER_TABLES, TSDB_TABLE_FNAME_LEN) == 0) {
pInfo->readHandle = pSysTableReadHandle; pInfo->readHandle = pSysTableReadHandle;
blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity); blockDataEnsureCapacity(pInfo->pRes, pInfo->capacity);
} else { } else {
......
...@@ -58,6 +58,9 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); ...@@ -58,6 +58,9 @@ bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
int32_t firstFunction(SqlFunctionCtx *pCtx); int32_t firstFunction(SqlFunctionCtx *pCtx);
int32_t lastFunction(SqlFunctionCtx *pCtx); int32_t lastFunction(SqlFunctionCtx *pCtx);
bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv);
int32_t topFunction(SqlFunctionCtx *pCtx);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -27,41 +27,31 @@ extern "C" { ...@@ -27,41 +27,31 @@ extern "C" {
#endif #endif
#define UDF_LISTEN_PIPE_NAME_LEN 32 #define UDF_LISTEN_PIPE_NAME_LEN 32
#define UDF_LISTEN_PIPE_NAME_PREFIX "udf.sock." #define UDF_LISTEN_PIPE_NAME_PREFIX "udfd.sock."
//====================================================================================== //======================================================================================
//begin API to taosd and qworker //begin API to taosd and qworker
enum { enum {
UDFC_CODE_STOPPING = -1, UDFC_CODE_STOPPING = -1,
UDFC_CODE_RESTARTING = -2,
UDFC_CODE_PIPE_READ_ERR = -3, UDFC_CODE_PIPE_READ_ERR = -3,
}; };
/*TODO: no api for dnode startudfd/stopudfd*/ typedef void *UdfcHandle;
/** typedef void *UdfcFuncHandle;
* start udfd dameon service
*/
int32_t startUdfd(int32_t dnodeId);
/**
* stop udfd dameon service
*/
int32_t stopUdfd(int32_t dnodeId);
/** /**
* create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf * create udfd proxy, called once in process that call setupUdf/callUdfxxx/teardownUdf
* @return error code * @return error code
*/ */
int32_t createUdfdProxy(int32_t dnodeId); int32_t udfcOpen(int32_t dnodeId, UdfcHandle* proxyHandle);
/** /**
* destroy udfd proxy * destroy udfd proxy
* @return error code * @return error code
*/ */
int32_t destroyUdfdProxy(int32_t dnodeId); int32_t udfcClose(UdfcHandle proxyhandle);
typedef void *UdfHandle;
/** /**
* setup udf * setup udf
...@@ -69,7 +59,7 @@ typedef void *UdfHandle; ...@@ -69,7 +59,7 @@ typedef void *UdfHandle;
* @param handle, out * @param handle, out
* @return error code * @return error code
*/ */
int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle); int32_t setupUdf(UdfcHandle proxyHandle, char udfName[], SEpSet *epSet, UdfcFuncHandle *handle);
typedef struct SUdfColumnMeta { typedef struct SUdfColumnMeta {
int16_t type; int16_t type;
...@@ -116,26 +106,26 @@ typedef struct SUdfInterBuf { ...@@ -116,26 +106,26 @@ typedef struct SUdfInterBuf {
} SUdfInterBuf; } SUdfInterBuf;
// output: interBuf // output: interBuf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf); int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf);
// input: block, state // input: block, state
// output: newState // output: newState
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState);
// input: interBuf // input: interBuf
// output: resultData // output: resultData
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData);
// input: interbuf1, interbuf2 // input: interbuf1, interbuf2
// output: resultBuf // output: resultBuf
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
// input: block // input: block
// output: resultData // output: resultData
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData); int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData);
/** /**
* tearn down udf * tearn down udf
* @param handle * @param handle
* @return * @return
*/ */
int32_t teardownUdf(UdfHandle handle); int32_t teardownUdf(UdfcFuncHandle handle);
// end API to taosd and qworker // end API to taosd and qworker
//============================================================================================================================= //=============================================================================================================================
......
...@@ -30,20 +30,20 @@ typedef struct SUdfInfo { ...@@ -30,20 +30,20 @@ typedef struct SUdfInfo {
char *path; char *path;
} SUdfInfo; } SUdfInfo;
typedef void *UdfHandle; typedef void *UdfcFuncHandle;
int32_t createUdfdProxy(); int32_t createUdfdProxy();
int32_t destroyUdfdProxy(); int32_t destroyUdfdProxy();
//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfHandle *handles); //int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles);
int32_t setupUdf(SUdfInfo* udf, UdfHandle* handle); int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle);
int32_t callUdf(UdfHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate,
int32_t *newStateSize, SSDataBlock *output); int32_t *newStateSize, SSDataBlock *output);
int32_t teardownUdf(UdfHandle handle); int32_t teardownUdf(UdfcFuncHandle handle);
typedef struct SUdfSetupRequest { typedef struct SUdfSetupRequest {
char udfName[16]; // char udfName[16]; //
......
...@@ -193,8 +193,15 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t ...@@ -193,8 +193,15 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// pseudo column do not need to check parameters
pFunc->node.resType = (SDataType){.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR};
return TSDB_CODE_SUCCESS;
}
static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType;
pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type};
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -497,9 +504,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -497,9 +504,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.type = FUNCTION_TYPE_TOP, .type = FUNCTION_TYPE_TOP,
.classification = FUNC_MGT_AGG_FUNC, .classification = FUNC_MGT_AGG_FUNC,
.translateFunc = translateTop, .translateFunc = translateTop,
.getEnvFunc = getMinmaxFuncEnv, .getEnvFunc = getTopBotFuncEnv,
.initFunc = maxFunctionSetup, .initFunc = functionSetup,
.processFunc = maxFunction, .processFunc = topFunction,
.finalizeFunc = functionFinalize .finalizeFunc = functionFinalize
}, },
{ {
...@@ -876,7 +883,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -876,7 +883,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.name = "tbname", .name = "tbname",
.type = FUNCTION_TYPE_TBNAME, .type = FUNCTION_TYPE_TBNAME,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC,
.translateFunc = NULL, .translateFunc = translateTbnameColumn,
.getEnvFunc = NULL, .getEnvFunc = NULL,
.initFunc = NULL, .initFunc = NULL,
.sprocessFunc = NULL, .sprocessFunc = NULL,
...@@ -914,7 +921,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -914,7 +921,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
}, },
{ {
.name = "_wendts", .name = "_wendts",
.type = FUNCTION_TYPE_QENDTS, .type = FUNCTION_TYPE_WENDTS,
.classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC, .classification = FUNC_MGT_PSEUDO_COLUMN_FUNC | FUNC_MGT_WINDOW_PC_FUNC,
.translateFunc = translateTimePseudoColumn, .translateFunc = translateTimePseudoColumn,
.getEnvFunc = getTimePseudoFuncEnv, .getEnvFunc = getTimePseudoFuncEnv,
......
...@@ -14,10 +14,11 @@ ...@@ -14,10 +14,11 @@
*/ */
#include "builtinsimpl.h" #include "builtinsimpl.h"
#include "tpercentile.h" #include <libs/nodes/querynodes.h>
#include "querynodes.h" #include "querynodes.h"
#include "taggfunction.h" #include "taggfunction.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tpercentile.h"
#define SET_VAL(_info, numOfElem, res) \ #define SET_VAL(_info, numOfElem, res) \
do { \ do { \
...@@ -472,17 +473,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) { ...@@ -472,17 +473,6 @@ int32_t maxFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
typedef struct STopBotRes {
int32_t num;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
int32_t bytes = pColNode->node.resType.bytes;
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
return true;
}
typedef struct SStddevRes { typedef struct SStddevRes {
double result; double result;
int64_t count; int64_t count;
...@@ -523,7 +513,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) { ...@@ -523,7 +513,7 @@ int32_t stddevFunction(SqlFunctionCtx* pCtx) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_TINYINT: { case TSDB_DATA_TYPE_TINYINT: {
int8_t* plist = (int8_t*)pCol->pData; int8_t* plist = (int8_t*)pCol->pData;
for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue; continue;
} }
...@@ -749,9 +739,9 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) { ...@@ -749,9 +739,9 @@ int32_t percentileFunction(SqlFunctionCtx *pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// TODO set the correct parameter.
void percentileFinalize(SqlFunctionCtx* pCtx) { void percentileFinalize(SqlFunctionCtx* pCtx) {
double v = 50;//pCtx->param[0].nType == TSDB_DATA_TYPE_INT ? pCtx->param[0].i64 : pCtx->param[0].dKey; SVariant* pVal = &pCtx->param[1].param;
double v = pVal->nType == TSDB_DATA_TYPE_INT ? pVal->i : pVal->d;
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx); SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo); SPercentileInfo* ppInfo = (SPercentileInfo *) GET_ROWCELL_INTERBUF(pResInfo);
...@@ -1173,3 +1163,130 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) { ...@@ -1173,3 +1163,130 @@ int32_t diffFunction(SqlFunctionCtx *pCtx) {
} }
} }
typedef struct STopBotResItem {
SVariant v;
uint64_t uid; // it is a table uid, used to extract tag data during building of the final result for the tag data
struct {
int32_t pageId;
int32_t offset;
} tuplePos; // tuple data of this chosen row
} STopBotResItem;
typedef struct STopBotRes {
int32_t num;
STopBotResItem *pItems;
} STopBotRes;
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pColNode = (SColumnNode*) nodesListGetNode(pFunc->pParameterList, 0);
int32_t bytes = pColNode->node.resType.bytes;
SValueNode* pkNode = (SValueNode*) nodesListGetNode(pFunc->pParameterList, 1);
pEnv->calcMemSize = sizeof(STopBotRes) + pkNode->datum.i * bytes;
return true;
}
static STopBotRes *getTopBotOutputInfo(SqlFunctionCtx *pCtx) {
SResultRowEntryInfo *pResInfo = GET_RES_INFO(pCtx);
STopBotRes* pRes = GET_ROWCELL_INTERBUF(pResInfo);
pRes->pItems = (STopBotResItem*)((char*) pRes + sizeof(STopBotRes));
return pRes;
}
static void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid);
int32_t topFunction(SqlFunctionCtx *pCtx) {
int32_t numOfElems = 0;
STopBotRes *pRes = getTopBotOutputInfo(pCtx);
assert(pRes->num >= 0);
// if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) {
// buildTopBotStruct(pRes, pCtx);
// }
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pCol = pInput->pData[0];
int32_t type = pInput->pData[0]->info.type;
int32_t start = pInput->startRowIndex;
int32_t numOfRows = pInput->numOfRows;
for (int32_t i = start; i < numOfRows + start; ++i) {
if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) {
continue;
}
numOfElems++;
char* data = colDataGetData(pCol, i);
doAddIntoResult(pRes, pCtx->param[1].param.i, data, type, pInput->uid);
}
// treat the result as only one result
SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);
return TSDB_CODE_SUCCESS;
}
static int32_t topBotResComparFn(const void *p1, const void *p2, const void *param) {
uint16_t type = *(uint16_t *) param;
STopBotResItem *val1 = (STopBotResItem *) p1;
STopBotResItem *val2 = (STopBotResItem *) p2;
if (IS_SIGNED_NUMERIC_TYPE(type)) {
if (val1->v.i == val2->v.i) {
return 0;
}
return (val1->v.i > val2->v.i) ? 1 : -1;
} else if (IS_UNSIGNED_NUMERIC_TYPE(type)) {
if (val1->v.u == val2->v.u) {
return 0;
}
return (val1->v.u > val2->v.u) ? 1 : -1;
}
if (val1->v.d == val2->v.d) {
return 0;
}
return (val1->v.d > val2->v.d) ? 1 : -1;
}
void doAddIntoResult(STopBotRes *pRes, int32_t maxSize, void *pData, uint16_t type, uint64_t uid) {
SVariant val = {0};
taosVariantCreateFromBinary(&val, pData, tDataTypes[type].bytes, type);
STopBotResItem *pItems = pRes->pItems;
assert(pItems != NULL);
// not full yet
if (pRes->num < maxSize) {
STopBotResItem* pItem = &pItems[pRes->num];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
pRes->num++;
taosheapsort((void *) pItem, sizeof(STopBotResItem), pRes->num, (const void *) &type, topBotResComparFn, false);
} else { // replace the minimum value in the result
if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) ||
(IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) ||
(IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) {
STopBotResItem* pItem = &pItems[pRes->num];
pItem->v = val;
pItem->uid = uid;
pItem->tuplePos.pageId = -1; // todo set the corresponding tuple data in the disk-based buffer
taosheapadjust((void *) pItem, sizeof(STopBotResItem), 0, pRes->num - 1, (const void *) &type, topBotResComparFn, NULL, false);
}
}
}
void topBotFinalize(SqlFunctionCtx* pCtx) {
functionFinalize(pCtx);
}
\ No newline at end of file
...@@ -14,19 +14,15 @@ ...@@ -14,19 +14,15 @@
*/ */
#include "uv.h" #include "uv.h"
#include "os.h" #include "os.h"
#include "tlog.h"
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tarray.h" #include "tarray.h"
#include "tdatablock.h" #include "tdatablock.h"
//TODO: when startup, set thread poll size. add it to cfg
//TODO: test for udfd restart
//TODO: udfd restart when exist or aborts
//TODO: deal with uv task that has been started and then udfd core dumped
//TODO: network error processing. //TODO: network error processing.
//TODO: add unit test //TODO: add unit test
//TODO: include all global variable under context struct //TODO: include all global variable under context struct
/* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl> /* Copyright (c) 2013, Ben Noordhuis <info@bnoordhuis.nl>
* The QUEUE is copied from queue.h under libuv * The QUEUE is copied from queue.h under libuv
* */ * */
...@@ -125,12 +121,35 @@ enum { ...@@ -125,12 +121,35 @@ enum {
UV_TASK_DISCONNECT = 2 UV_TASK_DISCONNECT = 2
}; };
int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfdProxy {
int32_t dnodeId;
uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;
uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int8_t gUdfcState;
QUEUE gUdfTaskQueue;
QUEUE gUvProcTaskQueue;
// int8_t gUdfcState = UDFC_STATE_INITAL;
// QUEUE gUdfTaskQueue = {0};
// QUEUE gUvProcTaskQueue = {0};
} SUdfdProxy;
typedef struct SUdfUvSession { typedef struct SUdfUvSession {
SUdfdProxy *udfc;
int64_t severHandle; int64_t severHandle;
uv_pipe_t *udfSvcPipe; uv_pipe_t *udfSvcPipe;
} SUdfUvSession; } SUdfUvSession;
typedef struct SClientUvTaskNode { typedef struct SClientUvTaskNode {
SUdfdProxy *udfc;
int8_t type; int8_t type;
int errCode; int errCode;
...@@ -169,7 +188,6 @@ typedef struct SClientUdfTask { ...@@ -169,7 +188,6 @@ typedef struct SClientUdfTask {
} _teardown; } _teardown;
}; };
} SClientUdfTask; } SClientUdfTask;
typedef struct SClientConnBuf { typedef struct SClientConnBuf {
...@@ -185,34 +203,13 @@ typedef struct SClientUvConn { ...@@ -185,34 +203,13 @@ typedef struct SClientUvConn {
SClientConnBuf readBuf; SClientConnBuf readBuf;
} SClientUvConn; } SClientUvConn;
uv_process_t gUdfdProcess;
uv_barrier_t gUdfInitBarrier;
uv_loop_t gUdfdLoop;
uv_thread_t gUdfLoopThread;
uv_async_t gUdfLoopTaskAync;
uv_async_t gUdfLoopStopAsync;
uv_mutex_t gUdfTaskQueueMutex;
int64_t gUdfTaskSeqNum = 0;
enum { enum {
UDFC_STATE_INITAL = 0, // initial state UDFC_STATE_INITAL = 0, // initial state
UDFC_STATE_STARTNG, // starting after createUdfdProxy UDFC_STATE_STARTNG, // starting after udfcOpen
UDFC_STATE_READY, // started and begin to receive quests UDFC_STATE_READY, // started and begin to receive quests
UDFC_STATE_RESTARTING, // udfd abnormal exit. cleaning up and restart. UDFC_STATE_STOPPING, // stopping after udfcClose
UDFC_STATE_STOPPING, // stopping after destroyUdfdProxy
UDFC_STATUS_FINAL, // stopped UDFC_STATUS_FINAL, // stopped
}; };
int8_t gUdfcState = UDFC_STATE_INITAL;
//double circular linked list
QUEUE gUdfTaskQueue = {0};
QUEUE gUvProcTaskQueue = {0};
int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) {
int32_t len = 0; int32_t len = 0;
...@@ -777,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) { ...@@ -777,13 +774,14 @@ void onUdfClientConnect(uv_connect_t *connect, int status) {
int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) { int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode **pUvTask) {
SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode)); SClientUvTaskNode *uvTask = taosMemoryCalloc(1, sizeof(SClientUvTaskNode));
uvTask->type = uvTaskType; uvTask->type = uvTaskType;
uvTask->udfc = task->session->udfc;
if (uvTaskType == UV_TASK_CONNECT) { if (uvTaskType == UV_TASK_CONNECT) {
} else if (uvTaskType == UV_TASK_REQ_RSP) { } else if (uvTaskType == UV_TASK_REQ_RSP) {
uvTask->pipe = task->session->udfSvcPipe; uvTask->pipe = task->session->udfSvcPipe;
SUdfRequest request; SUdfRequest request;
request.type = task->type; request.type = task->type;
request.seqNum = gUdfTaskSeqNum++; request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1);
if (task->type == UDF_TASK_SETUP) { if (task->type == UDF_TASK_SETUP) {
request.setup = task->_setup.req; request.setup = task->_setup.req;
...@@ -815,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN ...@@ -815,11 +813,11 @@ int32_t createUdfcUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskN
int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) { int32_t queueUvUdfTask(SClientUvTaskNode *uvTask) {
debugPrint("%s, %d", "queue uv task", uvTask->type); debugPrint("%s, %d", "queue uv task", uvTask->type);
SUdfdProxy *udfc = uvTask->udfc;
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_INSERT_TAIL(&gUdfTaskQueue, &uvTask->recvTaskQueue); QUEUE_INSERT_TAIL(&udfc->gUdfTaskQueue, &uvTask->recvTaskQueue);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
uv_async_send(&gUdfLoopTaskAync); uv_async_send(&udfc->gUdfLoopTaskAync);
uv_sem_wait(&uvTask->taskSem); uv_sem_wait(&uvTask->taskSem);
uv_sem_destroy(&uvTask->taskSem); uv_sem_destroy(&uvTask->taskSem);
...@@ -832,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -832,7 +830,7 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
switch (uvTask->type) { switch (uvTask->type) {
case UV_TASK_CONNECT: { case UV_TASK_CONNECT: {
uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t)); uv_pipe_t *pipe = taosMemoryMalloc(sizeof(uv_pipe_t));
uv_pipe_init(&gUdfdLoop, pipe, 0); uv_pipe_init(&uvTask->udfc->gUdfdLoop, pipe, 0);
uvTask->pipe = pipe; uvTask->pipe = pipe;
SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn)); SClientUvConn *conn = taosMemoryMalloc(sizeof(SClientUvConn));
...@@ -873,142 +871,98 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) { ...@@ -873,142 +871,98 @@ int32_t startUvUdfTask(SClientUvTaskNode *uvTask) {
} }
void udfClientAsyncCb(uv_async_t *async) { void udfClientAsyncCb(uv_async_t *async) {
SUdfdProxy *udfc = async->data;
QUEUE wq; QUEUE wq;
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&gUdfTaskQueue, &wq); QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
while (!QUEUE_EMPTY(&wq)) { while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
startUvUdfTask(task); startUvUdfTask(task);
QUEUE_INSERT_TAIL(&gUvProcTaskQueue, &task->procTaskQueue); QUEUE_INSERT_TAIL(&udfc->gUvProcTaskQueue, &task->procTaskQueue);
} }
} }
void cleanUpUvTasks() { void cleanUpUvTasks(SUdfdProxy *udfc) {
QUEUE wq; QUEUE wq;
uv_mutex_lock(&gUdfTaskQueueMutex); uv_mutex_lock(&udfc->gUdfTaskQueueMutex);
QUEUE_MOVE(&gUdfTaskQueue, &wq); QUEUE_MOVE(&udfc->gUdfTaskQueue, &wq);
uv_mutex_unlock(&gUdfTaskQueueMutex); uv_mutex_unlock(&udfc->gUdfTaskQueueMutex);
while (!QUEUE_EMPTY(&wq)) { while (!QUEUE_EMPTY(&wq)) {
QUEUE* h = QUEUE_HEAD(&wq); QUEUE* h = QUEUE_HEAD(&wq);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) { if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING; task->errCode = UDFC_CODE_STOPPING;
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
task->errCode = UDFC_CODE_RESTARTING;
} }
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
} }
// TODO: deal with tasks that are waiting result. // TODO: deal with tasks that are waiting result.
while (!QUEUE_EMPTY(&gUvProcTaskQueue)) { while (!QUEUE_EMPTY(&udfc->gUvProcTaskQueue)) {
QUEUE* h = QUEUE_HEAD(&gUvProcTaskQueue); QUEUE* h = QUEUE_HEAD(&udfc->gUvProcTaskQueue);
QUEUE_REMOVE(h); QUEUE_REMOVE(h);
SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue);
if (gUdfcState == UDFC_STATE_STOPPING) { if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
task->errCode = UDFC_CODE_STOPPING; task->errCode = UDFC_CODE_STOPPING;
} else if (gUdfcState == UDFC_STATE_RESTARTING) {
task->errCode = UDFC_CODE_RESTARTING;
} }
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
} }
} }
void udfStopAsyncCb(uv_async_t *async) { void udfStopAsyncCb(uv_async_t *async) {
cleanUpUvTasks(); SUdfdProxy *udfc = async->data;
if (gUdfcState == UDFC_STATE_STOPPING) { cleanUpUvTasks(udfc);
uv_stop(&gUdfdLoop); if (udfc->gUdfcState == UDFC_STATE_STOPPING) {
uv_stop(&udfc->gUdfdLoop);
} }
} }
int32_t udfcSpawnUdfd();
void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
//TODO: pipe close will be first received
debugPrint("Process exited with status %" PRId64 ", signal %d", exit_status, term_signal);
uv_close((uv_handle_t *) req, NULL);
//TODO: restart the udfd process
if (gUdfcState == UDFC_STATE_STOPPING) {
if (term_signal != SIGINT) {
//TODO: log error
}
}
if (gUdfcState == UDFC_STATE_READY) {
gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it
//cleanUpUvTasks();
udfcSpawnUdfd();
}
}
int32_t udfcSpawnUdfd() {
//TODO: path
uv_process_options_t options = {0};
static char path[256] = {0};
size_t cwdSize;
uv_cwd(path, &cwdSize);
strcat(path, "/udfd");
char* args[2] = {path, NULL};
options.args = args;
options.file = path;
options.exit_cb = onUdfdExit;
options.stdio_count = 3;
uv_stdio_container_t child_stdio[3];
child_stdio[0].flags = UV_IGNORE;
child_stdio[1].flags = UV_INHERIT_FD;
child_stdio[1].data.fd = 1;
child_stdio[2].flags = UV_INHERIT_FD;
child_stdio[2].data.fd = 2;
options.stdio = child_stdio;
//TODO spawn error
int err = uv_spawn(&gUdfdLoop, &gUdfdProcess, &options);
if (err != 0) {
debugPrint("can not spawn udfd. path: %s, error: %s", path, uv_strerror(err));
}
return err;
}
void constructUdfService(void *argsThread) { void constructUdfService(void *argsThread) {
uv_loop_init(&gUdfdLoop); SUdfdProxy *udfc = (SUdfdProxy*)argsThread;
uv_loop_init(&udfc->gUdfdLoop);
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb); uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopTaskAync, udfClientAsyncCb);
uv_mutex_init(&gUdfTaskQueueMutex); udfc->gUdfLoopTaskAync.data = udfc;
QUEUE_INIT(&gUdfTaskQueue); uv_async_init(&udfc->gUdfdLoop, &udfc->gUdfLoopStopAsync, udfStopAsyncCb);
QUEUE_INIT(&gUvProcTaskQueue); udfc->gUdfLoopStopAsync.data = udfc;
uv_barrier_wait(&gUdfInitBarrier); uv_mutex_init(&udfc->gUdfTaskQueueMutex);
QUEUE_INIT(&udfc->gUdfTaskQueue);
QUEUE_INIT(&udfc->gUvProcTaskQueue);
uv_barrier_wait(&udfc->gUdfInitBarrier);
//TODO return value of uv_run //TODO return value of uv_run
uv_run(&gUdfdLoop, UV_RUN_DEFAULT); uv_run(&udfc->gUdfdLoop, UV_RUN_DEFAULT);
uv_loop_close(&gUdfdLoop); uv_loop_close(&udfc->gUdfdLoop);
} }
int32_t udfcOpen(int32_t dnodeId, UdfcHandle *udfc) {
int32_t createUdfdProxy(int32_t dnodeId) { SUdfdProxy *proxy = taosMemoryCalloc(1, sizeof(SUdfdProxy));
gUdfcState = UDFC_STATE_STARTNG; proxy->dnodeId = dnodeId;
uv_barrier_init(&gUdfInitBarrier, 2); proxy->gUdfcState = UDFC_STATE_STARTNG;
uv_thread_create(&gUdfLoopThread, constructUdfService, 0); uv_barrier_init(&proxy->gUdfInitBarrier, 2);
uv_barrier_wait(&gUdfInitBarrier); gUdfcState = UDFC_STATE_READY; uv_thread_create(&proxy->gUdfLoopThread, constructUdfService, proxy);
uv_barrier_wait(&proxy->gUdfInitBarrier);
proxy->gUdfcState = UDFC_STATE_READY;
*udfc = proxy;
return 0; return 0;
} }
int32_t destroyUdfdProxy(int32_t dnodeId) { int32_t udfcClose(UdfcHandle udfcHandle) {
gUdfcState = UDFC_STATE_STOPPING; SUdfdProxy *udfc = udfcHandle;
uv_barrier_destroy(&gUdfInitBarrier); udfc->gUdfcState = UDFC_STATE_STOPPING;
// if (gUdfcState == UDFC_STATE_STOPPING) { uv_async_send(&udfc->gUdfLoopStopAsync);
// uv_process_kill(&gUdfdProcess, SIGINT); uv_thread_join(&udfc->gUdfLoopThread);
// } uv_mutex_destroy(&udfc->gUdfTaskQueueMutex);
uv_async_send(&gUdfLoopStopAsync); uv_barrier_destroy(&udfc->gUdfInitBarrier);
uv_thread_join(&gUdfLoopThread); udfc->gUdfcState = UDFC_STATUS_FINAL;
uv_mutex_destroy(&gUdfTaskQueueMutex); taosMemoryFree(udfc);
gUdfcState = UDFC_STATUS_FINAL;
return 0; return 0;
} }
...@@ -1026,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -1026,11 +980,12 @@ int32_t udfcRunUvTask(SClientUdfTask *task, int8_t uvTaskType) {
return task->errCode; return task->errCode;
} }
int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { int32_t setupUdf(UdfcHandle udfc, char udfName[], SEpSet *epSet, UdfcFuncHandle *funcHandle) {
debugPrint("%s", "client setup udf"); debugPrint("%s", "client setup udf");
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = taosMemoryMalloc(sizeof(SUdfUvSession)); task->session = taosMemoryMalloc(sizeof(SUdfUvSession));
task->session->udfc = udfc;
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
...@@ -1046,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) { ...@@ -1046,13 +1001,13 @@ int32_t setupUdf(char udfName[], SEpSet *epSet, UdfHandle *handle) {
SUdfSetupResponse *rsp = &task->_setup.rsp; SUdfSetupResponse *rsp = &task->_setup.rsp;
task->session->severHandle = rsp->udfHandle; task->session->severHandle = rsp->udfHandle;
*handle = task->session; *funcHandle = task->session;
int32_t err = task->errCode; int32_t err = task->errCode;
taosMemoryFree(task); taosMemoryFree(task);
return err; return err;
} }
int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) { SSDataBlock* output, SUdfInterBuf *newState) {
debugPrint("%s", "client call udf"); debugPrint("%s", "client call udf");
...@@ -1121,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter ...@@ -1121,7 +1076,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter
} }
//TODO: translate these calls to callUdf //TODO: translate these calls to callUdf
int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { int32_t callUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_INIT; int8_t callType = TSDB_UDF_CALL_AGG_INIT;
int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf); int32_t err = callUdf(handle, callType, NULL, NULL, NULL, NULL, interBuf);
...@@ -1131,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) { ...@@ -1131,7 +1086,7 @@ int32_t callUdfAggInit(UdfHandle handle, SUdfInterBuf *interBuf) {
// input: block, state // input: block, state
// output: interbuf, // output: interbuf,
int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { int32_t callUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC; int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState);
return err; return err;
...@@ -1139,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st ...@@ -1139,7 +1094,7 @@ int32_t callUdfAggProcess(UdfHandle handle, SSDataBlock *block, SUdfInterBuf *st
// input: interbuf1, interbuf2 // input: interbuf1, interbuf2
// output: resultBuf // output: resultBuf
int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) {
int8_t callType = TSDB_UDF_CALL_AGG_MERGE; int8_t callType = TSDB_UDF_CALL_AGG_MERGE;
int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf);
return err; return err;
...@@ -1147,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf ...@@ -1147,7 +1102,7 @@ int32_t callUdfAggMerge(UdfHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf
// input: interBuf // input: interBuf
// output: resultData // output: resultData
int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) {
int8_t callType = TSDB_UDF_CALL_AGG_PROC; int8_t callType = TSDB_UDF_CALL_AGG_PROC;
int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData);
return err; return err;
...@@ -1155,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu ...@@ -1155,13 +1110,13 @@ int32_t callUdfAggFinalize(UdfHandle handle, SUdfInterBuf *interBuf, SUdfInterBu
// input: block // input: block
// output: resultData // output: resultData
int32_t callUdfScalaProcess(UdfHandle handle, SSDataBlock *block, SSDataBlock *resultData) { int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC; int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL);
return err; return err;
} }
int32_t teardownUdf(UdfHandle handle) { int32_t teardownUdf(UdfcFuncHandle handle) {
debugPrint("%s", "client teardown udf"); debugPrint("%s", "client teardown udf");
SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryMalloc(sizeof(SClientUdfTask));
......
...@@ -27,7 +27,10 @@ ...@@ -27,7 +27,10 @@
typedef struct SUdfdContext { typedef struct SUdfdContext {
uv_loop_t *loop; uv_loop_t *loop;
uv_pipe_t ctrlPipe;
uv_signal_t intrSignal;
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
uv_pipe_t listeningPipe;
void *clientRpc; void *clientRpc;
uv_mutex_t udfsMutex; uv_mutex_t udfsMutex;
...@@ -76,9 +79,9 @@ typedef struct SUdf { ...@@ -76,9 +79,9 @@ typedef struct SUdf {
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix // TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
// TODO: add private udf structure. // TODO: add private udf structure.
typedef struct SUdfHandle { typedef struct SUdfcFuncHandle {
SUdf *udf; SUdf *udf;
} SUdfHandle; } SUdfcFuncHandle;
int32_t udfdLoadUdf(char* udfName, SUdf* udf) { int32_t udfdLoadUdf(char* udfName, SUdf* udf) {
strcpy(udf->name, udfName); strcpy(udf->name, udfName);
...@@ -143,7 +146,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -143,7 +146,7 @@ void udfdProcessRequest(uv_work_t *req) {
} }
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
} }
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle)); SUdfcFuncHandle *handle = taosMemoryMalloc(sizeof(SUdfcFuncHandle));
handle->udf = udf; handle->udf = udf;
// TODO: allocate private structure and call init function and set it to handle // TODO: allocate private structure and call init function and set it to handle
SUdfResponse rsp; SUdfResponse rsp;
...@@ -166,7 +169,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -166,7 +169,7 @@ void udfdProcessRequest(uv_work_t *req) {
case UDF_TASK_CALL: { case UDF_TASK_CALL: {
SUdfCallRequest *call = &request.call; SUdfCallRequest *call = &request.call;
fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle);
SUdfHandle *handle = (SUdfHandle *)(call->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
SUdfDataBlock input = {0}; SUdfDataBlock input = {0};
...@@ -204,7 +207,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -204,7 +207,7 @@ void udfdProcessRequest(uv_work_t *req) {
case UDF_TASK_TEARDOWN: { case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = &request.teardown; SUdfTeardownRequest *teardown = &request.teardown;
fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle)
SUdfHandle *handle = (SUdfHandle *)(teardown->udfHandle); SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
bool unloadUdf = false; bool unloadUdf = false;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
...@@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) { ...@@ -380,10 +383,12 @@ void udfdOnNewConnection(uv_stream_t *server, int status) {
} }
} }
void removeListeningPipe(int sig) { void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
fnInfo("udfd signal received: %d\n", signum);
uv_fs_t req; uv_fs_t req;
uv_fs_unlink(global.loop, &req, "udf.sock", NULL); uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
exit(0); uv_signal_stop(handle);
uv_stop(global.loop);
} }
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
...@@ -492,37 +497,67 @@ static int32_t udfdInitLog() { ...@@ -492,37 +497,67 @@ static int32_t udfdInitLog() {
return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0); return taosCreateLog(logName, 1, configDir, NULL, NULL, NULL, 0);
} }
void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
buf->base = taosMemoryMalloc(suggested_size);
buf->len = suggested_size;
}
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
if (nread < 0) {
fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
uv_close((uv_handle_t*)q, NULL);
uv_stop(global.loop);
return;
}
fnError("udfd ctrl pipe read %zu bytes", nread);
taosMemoryFree(buf->base);
}
static int32_t removeListeningPipe() {
uv_fs_t req;
int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_fs_req_cleanup(&req);
return err;
}
static int32_t udfdUvInit() { static int32_t udfdUvInit() {
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop) { if (loop) {
uv_loop_init(loop); uv_loop_init(loop);
} }
global.loop = loop; global.loop = loop;
uv_pipe_init(global.loop, &global.ctrlPipe, 1);
uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
char dnodeId[8] = {0}; char dnodeId[8] = {0};
size_t dnodeIdSize; size_t dnodeIdSize;
uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
if (err != 0) {
dnodeId[0] = '1';
}
char listenPipeName[32] = {0}; char listenPipeName[32] = {0};
snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); snprintf(listenPipeName, sizeof(listenPipeName), "%s%s", UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId);
strcpy(global.listenPipeName, listenPipeName); strcpy(global.listenPipeName, listenPipeName);
uv_fs_t req; removeListeningPipe();
uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_pipe_t server; uv_pipe_init(global.loop, &global.listeningPipe, 0);
uv_pipe_init(global.loop, &server, 0);
signal(SIGINT, removeListeningPipe); uv_signal_init(global.loop, &global.intrSignal);
uv_signal_start(&global.intrSignal, udfdIntrSignalHandler, SIGINT);
int r; int r;
fnInfo("bind to pipe %s", global.listenPipeName); fnInfo("bind to pipe %s", global.listenPipeName);
if ((r = uv_pipe_bind(&server, listenPipeName))) { if ((r = uv_pipe_bind(&global.listeningPipe, listenPipeName))) {
fnError("Bind error %s", uv_err_name(r)); fnError("Bind error %s", uv_err_name(r));
removeListeningPipe(0); removeListeningPipe();
return -1; return -1;
} }
if ((r = uv_listen((uv_stream_t *)&server, 128, udfdOnNewConnection))) { if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
fnError("Listen error %s", uv_err_name(r)); fnError("Listen error %s", uv_err_name(r));
removeListeningPipe(0); removeListeningPipe();
return -2; return -2;
} }
return 0; return 0;
......
...@@ -8,7 +8,8 @@ ...@@ -8,7 +8,8 @@
#include "tdatablock.h" #include "tdatablock.h"
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
createUdfdProxy(1); UdfcHandle udfc;
udfcOpen(1, &udfc);
uv_sleep(1000); uv_sleep(1000);
char path[256] = {0}; char path[256] = {0};
size_t cwdSize = 256; size_t cwdSize = 256;
...@@ -20,9 +21,9 @@ int main(int argc, char *argv[]) { ...@@ -20,9 +21,9 @@ int main(int argc, char *argv[]) {
fprintf(stdout, "current working directory:%s\n", path); fprintf(stdout, "current working directory:%s\n", path);
strcat(path, "/libudf1.so"); strcat(path, "/libudf1.so");
UdfHandle handle; UdfcFuncHandle handle;
SEpSet epSet; SEpSet epSet;
setupUdf("udf1", &epSet, &handle); setupUdf(udfc, "udf1", &epSet, &handle);
SSDataBlock block = {0}; SSDataBlock block = {0};
SSDataBlock* pBlock = &block; SSDataBlock* pBlock = &block;
...@@ -53,5 +54,5 @@ int main(int argc, char *argv[]) { ...@@ -53,5 +54,5 @@ int main(int argc, char *argv[]) {
} }
teardownUdf(handle); teardownUdf(handle);
destroyUdfdProxy(1); udfcClose(udfc);
} }
...@@ -63,13 +63,14 @@ typedef struct { ...@@ -63,13 +63,14 @@ typedef struct {
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
int (*retry)(void* parent, SRpcMsg*, SEpSet*);
int32_t refCount; int32_t refCount;
void* parent; void* parent;
void* idPool; // handle to ID pool void* idPool; // handle to ID pool
void* tmrCtrl; // handle to timer void* tmrCtrl; // handle to timer
SHashObj* hash; // handle returned by hash utility SHashObj* hash; // handle returned by hash utility
void* tcphandle; // returned handle from TCP initialization void* tcphandle; // returned handle from TCP initialization
TdThreadMutex mutex; TdThreadMutex mutex;
} SRpcInfo; } SRpcInfo;
......
...@@ -39,6 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) { ...@@ -39,6 +39,7 @@ void* rpcOpen(const SRpcInit* pInit) {
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;
pRpc->afp = pInit->afp; pRpc->afp = pInit->afp;
pRpc->retry = pInit->rfp;
if (pInit->connType == TAOS_CONN_SERVER) { if (pInit->connType == TAOS_CONN_SERVER) {
pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
...@@ -100,11 +101,10 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) { ...@@ -100,11 +101,10 @@ void rpcSendRedirectRsp(void* thandle, const SEpSet* pEpSet) {
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
memset(&rpcMsg, 0, sizeof(rpcMsg)); memset(&rpcMsg, 0, sizeof(rpcMsg));
rpcMsg.contLen = sizeof(SEpSet); SMEpSet msg = {.epSet = *pEpSet};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); int32_t len = tSerializeSMEpSet(NULL, 0, &msg);
if (rpcMsg.pCont == NULL) return; rpcMsg.pCont = rpcMallocCont(len);
tSerializeSMEpSet(rpcMsg.pCont, len, &msg);
memcpy(rpcMsg.pCont, pEpSet, sizeof(SEpSet));
rpcMsg.code = TSDB_CODE_RPC_REDIRECT; rpcMsg.code = TSDB_CODE_RPC_REDIRECT;
rpcMsg.handle = thandle; rpcMsg.handle = thandle;
......
...@@ -31,12 +31,8 @@ typedef struct SCliConn { ...@@ -31,12 +31,8 @@ typedef struct SCliConn {
int hThrdIdx; int hThrdIdx;
STransCtx ctx; STransCtx ctx;
bool broken; // link broken or not bool broken; // link broken or not
ConnStatus status; // ConnStatus status; //
int release; // 1: release
// spi configure
char spi;
char secured;
char* ip; char* ip;
uint32_t port; uint32_t port;
...@@ -44,7 +40,6 @@ typedef struct SCliConn { ...@@ -44,7 +40,6 @@ typedef struct SCliConn {
// debug and log info // debug and log info
struct sockaddr_in addr; struct sockaddr_in addr;
struct sockaddr_in locaddr; struct sockaddr_in locaddr;
} SCliConn; } SCliConn;
typedef struct SCliMsg { typedef struct SCliMsg {
...@@ -102,6 +97,8 @@ static void cliSendCb(uv_write_t* req, int status); ...@@ -102,6 +97,8 @@ static void cliSendCb(uv_write_t* req, int status);
static void cliConnCb(uv_connect_t* req, int status); static void cliConnCb(uv_connect_t* req, int status);
static void cliAsyncCb(uv_async_t* handle); static void cliAsyncCb(uv_async_t* handle);
static void cliAppCb(SCliConn* pConn, STransMsg* pMsg);
static SCliConn* cliCreateConn(SCliThrdObj* thrd); static SCliConn* cliCreateConn(SCliThrdObj* thrd);
static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/); static void cliDestroyConn(SCliConn* pConn, bool clear /*clear tcp handle or not*/);
static void cliDestroy(uv_handle_t* handle); static void cliDestroy(uv_handle_t* handle);
...@@ -303,8 +300,6 @@ void cliHandleResp(SCliConn* conn) { ...@@ -303,8 +300,6 @@ void cliHandleResp(SCliConn* conn) {
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
conn->secured = pHead->secured;
if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) { if (pCtx == NULL && CONN_NO_PERSIST_BY_APP(conn)) {
tTrace("except, server continue send while cli ignore it"); tTrace("except, server continue send while cli ignore it");
// transUnrefCliHandle(conn); // transUnrefCliHandle(conn);
...@@ -318,7 +313,8 @@ void cliHandleResp(SCliConn* conn) { ...@@ -318,7 +313,8 @@ void cliHandleResp(SCliConn* conn) {
if (pCtx == NULL || pCtx->pSem == NULL) { if (pCtx == NULL || pCtx->pSem == NULL) {
tTrace("%s cli conn %p handle resp", pTransInst->label, conn); tTrace("%s cli conn %p handle resp", pTransInst->label, conn);
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); cliAppCb(conn, &transMsg);
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else { } else {
tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn); tTrace("%s cli conn(sync) %p handle resp", pTransInst->label, conn);
memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg)); memcpy((char*)pCtx->pRsp, (char*)&transMsg, sizeof(transMsg));
...@@ -384,7 +380,8 @@ void cliHandleExcept(SCliConn* pConn) { ...@@ -384,7 +380,8 @@ void cliHandleExcept(SCliConn* pConn) {
once = true; once = true;
continue; continue;
} }
(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL); cliAppCb(pConn, &transMsg);
//(pTransInst->cfp)(pTransInst->parent, &transMsg, NULL);
} else { } else {
tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn); tTrace("%s cli conn(sync) %p handle except", pTransInst->label, pConn);
memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg)); memcpy((char*)(pCtx->pRsp), (char*)(&transMsg), sizeof(transMsg));
...@@ -884,6 +881,18 @@ int cliRBChoseIdx(STrans* pTransInst) { ...@@ -884,6 +881,18 @@ int cliRBChoseIdx(STrans* pTransInst) {
} }
return index % pTransInst->numOfThreads; return index % pTransInst->numOfThreads;
} }
void cliAppCb(SCliConn* pConn, STransMsg* transMsg) {
SCliThrdObj* pThrd = pConn->hostThrd;
STrans* pTransInst = pThrd->pTransInst;
if (transMsg->code == TSDB_CODE_RPC_REDIRECT && pTransInst->retry != NULL) {
SMEpSet emsg = {0};
tDeserializeSMEpSet(transMsg->pCont, transMsg->contLen, &emsg);
pTransInst->retry(pTransInst, transMsg, &(emsg.epSet));
} else {
pTransInst->cfp(pTransInst->parent, transMsg, NULL);
}
}
void transCloseClient(void* arg) { void transCloseClient(void* arg) {
SCliObj* cli = arg; SCliObj* cli = arg;
......
...@@ -30,7 +30,6 @@ typedef struct SSrvConn { ...@@ -30,7 +30,6 @@ typedef struct SSrvConn {
uv_timer_t pTimer; uv_timer_t pTimer;
queue queue; queue queue;
int ref;
int persist; // persist connection or not int persist; // persist connection or not
SConnBuffer readBuf; // read buf, SConnBuffer readBuf; // read buf,
int inType; int inType;
...@@ -692,8 +691,6 @@ static void uvDestroyConn(uv_handle_t* handle) { ...@@ -692,8 +691,6 @@ static void uvDestroyConn(uv_handle_t* handle) {
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
tTrace("work thread quit"); tTrace("work thread quit");
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
// uv_loop_close(thrd->loop);
// uv_stop(thrd->loop);
} }
} }
...@@ -756,8 +753,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { ...@@ -756,8 +753,6 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
thrd->quit = true; thrd->quit = true;
if (QUEUE_IS_EMPTY(&thrd->conn)) { if (QUEUE_IS_EMPTY(&thrd->conn)) {
uv_walk(thrd->loop, uvWalkCb, NULL); uv_walk(thrd->loop, uvWalkCb, NULL);
// uv_loop_close(thrd->loop);
// uv_stop(thrd->loop);
} else { } else {
destroyAllConn(thrd); destroyAllConn(thrd);
} }
...@@ -851,10 +846,8 @@ void transRefSrvHandle(void* handle) { ...@@ -851,10 +846,8 @@ void transRefSrvHandle(void* handle) {
if (handle == NULL) { if (handle == NULL) {
return; return;
} }
SSrvConn* conn = handle;
int ref = T_REF_INC((SSrvConn*)handle); int ref = T_REF_INC((SSrvConn*)handle);
UNUSED(ref); tDebug("server conn %p ref count: %d", handle, ref);
} }
void transUnrefSrvHandle(void* handle) { void transUnrefSrvHandle(void* handle) {
......
...@@ -37,6 +37,7 @@ int64_t tsOpenMax = 0; ...@@ -37,6 +37,7 @@ int64_t tsOpenMax = 0;
int64_t tsStreamMax = 0; int64_t tsStreamMax = 0;
float tsNumOfCores = 0; float tsNumOfCores = 0;
int64_t tsTotalMemoryKB = 0; int64_t tsTotalMemoryKB = 0;
char* tsProcPath = NULL;
void osDefaultInit() { void osDefaultInit() {
taosSeedRand(taosSafeRand()); taosSeedRand(taosSafeRand());
......
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
static char *tsProcPath = NULL; char *tsProcPath = NULL;
int32_t taosNewProc(char **args) { int32_t taosNewProc(char **args) {
int32_t pid = fork(); int32_t pid = fork();
......
...@@ -229,22 +229,21 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size ...@@ -229,22 +229,21 @@ void *taosbsearch(const void *key, const void *base, int64_t nmemb, int64_t size
} }
void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar, void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const void *parcompar,
__ext_compar_fn_t compar, const void *parswap, __ext_swap_fn_t swap, bool maxroot) { __ext_compar_fn_t compar, char* buf, bool maxroot) {
int32_t parent; int32_t parent;
int32_t child; int32_t child;
char *buf;
char* tmp = NULL;
if (buf == NULL) {
tmp = taosMemoryMalloc(size);
} else {
tmp = buf;
}
if (base && size > 0 && compar) { if (base && size > 0 && compar) {
parent = start; parent = start;
child = 2 * parent + 1; child = 2 * parent + 1;
if (swap == NULL) {
buf = taosMemoryCalloc(1, size);
if (buf == NULL) {
return;
}
}
if (maxroot) { if (maxroot) {
while (child <= end) { while (child <= end) {
if (child + 1 <= end && if (child + 1 <= end &&
...@@ -256,11 +255,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const ...@@ -256,11 +255,7 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
break; break;
} }
if (swap == NULL) { doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp);
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf);
} else {
(*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap);
}
parent = child; parent = child;
child = 2 * parent + 1; child = 2 * parent + 1;
...@@ -276,33 +271,35 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const ...@@ -276,33 +271,35 @@ void taosheapadjust(void *base, int32_t size, int32_t start, int32_t end, const
break; break;
} }
if (swap == NULL) { doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, tmp);
doswap(elePtrAt(base, size, parent), elePtrAt(base, size, child), size, buf);
} else {
(*swap)(elePtrAt(base, size, parent), elePtrAt(base, size, child), parswap);
}
parent = child; parent = child;
child = 2 * parent + 1; child = 2 * parent + 1;
} }
} }
}
if (swap == NULL) { if (buf == NULL) {
taosMemoryFreeClear(buf); taosMemoryFree(tmp);
}
} }
} }
void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar, void taosheapsort(void *base, int32_t size, int32_t len, const void *parcompar, __ext_compar_fn_t compar,
const void *parswap, __ext_swap_fn_t swap, bool maxroot) { bool maxroot) {
int32_t i; int32_t i;
char* buf = taosMemoryCalloc(1, size);
if (buf == NULL) {
return;
}
if (base && size > 0) { if (base && size > 0) {
for (i = len / 2 - 1; i >= 0; i--) { for (i = len / 2 - 1; i >= 0; i--) {
taosheapadjust(base, size, i, len - 1, parcompar, compar, parswap, swap, maxroot); taosheapadjust(base, size, i, len - 1, parcompar, compar, buf, maxroot);
} }
} }
taosMemoryFree(buf);
/* /*
char *buf = taosMemoryCalloc(1, size); char *buf = taosMemoryCalloc(1, size);
......
...@@ -131,8 +131,8 @@ if [ -n "$FILE_NAME" ]; then ...@@ -131,8 +131,8 @@ if [ -n "$FILE_NAME" ]; then
FLAG="-v" FLAG="-v"
fi fi
echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG echo valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG
valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --child-silent-after-fork=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes --log-file=${LOG_DIR}/valgrind-tsim.log $PROGRAM -c $CFG_DIR -f $FILE_NAME $FLAG
else else
if [[ $MULTIPROCESS -eq 1 ]];then if [[ $MULTIPROCESS -eq 1 ]];then
echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME echo "ExcuteCmd(multiprocess):" $PROGRAM -m -c $CFG_DIR -f $FILE_NAME
......
...@@ -279,7 +279,8 @@ endi ...@@ -279,7 +279,8 @@ endi
print ================> syntax error check not active ================> reactive print ================> syntax error check not active ================> reactive
sql_error select * from dev_001 session(ts,1w) sql_error select * from dev_001 session(ts,1w)
sql_error select count(*) from st session(ts,1w) print disable this temporarily, session can not be directly applied to super table.
#sql_error select count(*) from st session(ts,1w)
sql_error select count(*) from dev_001 group by tagtype session(ts,1w) sql_error select count(*) from dev_001 group by tagtype session(ts,1w)
sql_error sql select count(*) from dev_001 session(ts,1n) sql_error sql select count(*) from dev_001 session(ts,1n)
sql_error sql select count(*) from dev_001 session(ts,1y) sql_error sql select count(*) from dev_001 session(ts,1y)
......
...@@ -224,27 +224,63 @@ int32_t shellRunCommand(TAOS *con, char *command) { ...@@ -224,27 +224,63 @@ int32_t shellRunCommand(TAOS *con, char *command) {
} }
} }
char quote = 0, *cmd = command; bool esc = false;
char quote = 0, *cmd = command, *p = command;
for (char c = *command++; c != 0; c = *command++) { for (char c = *command++; c != 0; c = *command++) {
if (c == '\\' && (*command == '\'' || *command == '"' || *command == '`')) { if (esc) {
command ++; switch (c) {
case 'n':
c = '\n';
break;
case 'r':
c = '\r';
break;
case 't':
c = '\t';
break;
case 'G':
*p++ = '\\';
break;
case '\'':
case '"':
if (quote) {
*p++ = '\\';
}
break;
}
*p++ = c;
esc = false;
continue; continue;
} }
if (c == '\\') {
if (quote != 0 && (*command == '_' || *command == '\\')) {
// DO nothing
} else {
esc = true;
continue;
}
}
if (quote == c) { if (quote == c) {
quote = 0; quote = 0;
} else if (quote == 0 && (c == '\'' || c == '"' || c == '`')) { } else if (quote == 0 && (c == '\'' || c == '"')) {
quote = c; quote = c;
} else if (c == ';' && quote == 0) { }
c = *command;
*command = 0; *p++ = c;
if (c == ';' && quote == 0) {
c = *p;
*p = 0;
if (shellRunSingleCommand(con, cmd) < 0) { if (shellRunSingleCommand(con, cmd) < 0) {
return -1; return -1;
} }
*command = c; *p = c;
cmd = command; p = cmd;
} }
} }
*p = 0;
return shellRunSingleCommand(con, cmd); return shellRunSingleCommand(con, cmd);
} }
...@@ -538,23 +574,19 @@ static void shellPrintNChar(const char *str, int length, int width) { ...@@ -538,23 +574,19 @@ static void shellPrintNChar(const char *str, int length, int width) {
while (pos < length) { while (pos < length) {
TdWchar wc; TdWchar wc;
int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); int bytes = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
if (bytes <= 0) { if (bytes == 0) {
break; break;
} }
if (pos + bytes > length) { pos += bytes;
if (pos > length) {
break; break;
} }
int w = 0;
#ifdef WINDOWS #ifdef WINDOWS
w = bytes; int w = bytes;
#else #else
if(*(str + pos) == '\t' || *(str + pos) == '\n' || *(str + pos) == '\r'){ int w = taosWcharWidth(wc);
w = bytes;
}else{
w = taosWcharWidth(wc);
}
#endif #endif
pos += bytes;
if (w <= 0) { if (w <= 0) {
continue; continue;
} }
...@@ -648,7 +680,6 @@ static void printField(const char *val, TAOS_FIELD *field, int width, int32_t le ...@@ -648,7 +680,6 @@ static void printField(const char *val, TAOS_FIELD *field, int width, int32_t le
break; break;
case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_BINARY:
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR:
case TSDB_DATA_TYPE_JSON:
shellPrintNChar(val, length, width); shellPrintNChar(val, length, width);
break; break;
case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_TIMESTAMP:
...@@ -761,8 +792,7 @@ static int calcColWidth(TAOS_FIELD *field, int precision) { ...@@ -761,8 +792,7 @@ static int calcColWidth(TAOS_FIELD *field, int precision) {
return TMAX(field->bytes, width); return TMAX(field->bytes, width);
} }
case TSDB_DATA_TYPE_NCHAR: case TSDB_DATA_TYPE_NCHAR: {
case TSDB_DATA_TYPE_JSON:{
int16_t bytes = field->bytes * TSDB_NCHAR_SIZE; int16_t bytes = field->bytes * TSDB_NCHAR_SIZE;
if (bytes > tsMaxBinaryDisplayWidth) { if (bytes > tsMaxBinaryDisplayWidth) {
return TMAX(tsMaxBinaryDisplayWidth, width); return TMAX(tsMaxBinaryDisplayWidth, width);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册