未验证 提交 7ebf2d5d 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #17614 from taosdata/opt/optSysIdxPerf

enh: opt sys idx perf
...@@ -1062,7 +1062,7 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) ...@@ -1062,7 +1062,7 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) { if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) {
goto END; goto END;
} }
bool first = true;
int32_t valid = 0; int32_t valid = 0;
while (1) { while (1) {
void *entryKey = NULL; void *entryKey = NULL;
...@@ -1074,7 +1074,13 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) ...@@ -1074,7 +1074,13 @@ int32_t metaFilterCreateTime(SMeta *pMeta, SMetaFltParam *param, SArray *pUids)
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type); int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type);
if (cmp == 0) taosArrayPush(pUids, &p->uid); if (cmp == 0) taosArrayPush(pUids, &p->uid);
if (cmp == -1) break;
if (param->reverse == false) {
if (cmp == -1) break;
} else if (param->reverse) {
if (cmp == 1) break;
}
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur); valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break; if (valid < 0) break;
} }
......
...@@ -572,8 +572,12 @@ static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) { ...@@ -572,8 +572,12 @@ static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) {
} }
static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) { static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
ncolKey->ncol = pME->ntbEntry.schemaRow.nCols; if (pME->type == TSDB_NORMAL_TABLE) {
ncolKey->uid = pME->uid; ncolKey->ncol = pME->ntbEntry.schemaRow.nCols;
ncolKey->uid = pME->uid;
} else {
return -1;
}
return 0; return 0;
} }
...@@ -777,9 +781,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -777,9 +781,13 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION;
goto _err; goto _err;
} }
// search the column to add/drop/update // search the column to add/drop/update
pSchema = &entry.ntbEntry.schemaRow; pSchema = &entry.ntbEntry.schemaRow;
// save old entry
SMetaEntry oldEntry = {.type = TSDB_NORMAL_TABLE, .uid = entry.uid};
oldEntry.ntbEntry.schemaRow.nCols = pSchema->nCols;
int32_t iCol = 0; int32_t iCol = 0;
for (;;) { for (;;) {
pColumn = NULL; pColumn = NULL;
...@@ -872,6 +880,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -872,6 +880,9 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
entry.version = version; entry.version = version;
metaDeleteNcolIdx(pMeta, &oldEntry);
metaUpdateNcolIdx(pMeta, &entry);
// do actual write // do actual write
metaWLock(pMeta); metaWLock(pMeta);
......
...@@ -2881,7 +2881,7 @@ int optSysDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) { ...@@ -2881,7 +2881,7 @@ int optSysDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
default: default:
return -1; return -1;
} }
return 1; return cmp;
} }
static int optSysFilterFuncImpl__LowerThan(void* a, void* b, int16_t dtype) { static int optSysFilterFuncImpl__LowerThan(void* a, void* b, int16_t dtype) {
...@@ -2987,10 +2987,6 @@ static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) { ...@@ -2987,10 +2987,6 @@ static int32_t sysFilte__TableName(void* arg, SNode* pNode, SArray* result) {
.val = pVal->datum.p, .val = pVal->datum.p,
.reverse = reverse, .reverse = reverse,
.filterFunc = func}; .filterFunc = func};
int32_t ret = metaFilterCreateTime(pMeta, &param, result);
if (ret == 0) return 0;
return -1; return -1;
} }
...@@ -3002,15 +2998,17 @@ static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) { ...@@ -3002,15 +2998,17 @@ static int32_t sysFilte__CreateTime(void* arg, SNode* pNode, SArray* result) {
bool reverse = false; bool reverse = false;
__optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse); __optSysFilter func = optSysGetFilterFunc(pOper->opType, &reverse);
SMetaFltParam param = {.suid = 0,
.cid = 0,
.type = TSDB_DATA_TYPE_BIGINT,
.val = &pVal->datum.i,
.reverse = reverse,
.filterFunc = func};
int32_t ret = metaFilterCreateTime(pMeta, &param, result);
if (func == NULL) return -1; if (func == NULL) return -1;
return 0;
SMetaFltParam param = {.suid = 0,
.cid = 0,
.type = TSDB_DATA_TYPE_BIGINT,
.val = &pVal->datum.i,
.reverse = reverse,
.filterFunc = func};
int32_t ret = metaFilterCreateTime(pMeta, &param, result);
return ret;
} }
static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) { static int32_t sysFilte__Ncolumn(void* arg, SNode* pNode, SArray* result) {
void* pMeta = ((SSTabFltArg*)arg)->pMeta; void* pMeta = ((SSTabFltArg*)arg)->pMeta;
...@@ -3073,7 +3071,7 @@ static int32_t sysChkFilter__Comm(SNode* pNode) { ...@@ -3073,7 +3071,7 @@ static int32_t sysChkFilter__Comm(SNode* pNode) {
SOperatorNode* pOper = (SOperatorNode*)pNode; SOperatorNode* pOper = (SOperatorNode*)pNode;
EOperatorType opType = pOper->opType; EOperatorType opType = pOper->opType;
if (opType != OP_TYPE_EQUAL && opType != OP_TYPE_LOWER_EQUAL && opType != OP_TYPE_LOWER_THAN && if (opType != OP_TYPE_EQUAL && opType != OP_TYPE_LOWER_EQUAL && opType != OP_TYPE_LOWER_THAN &&
OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) { opType != OP_TYPE_GREATER_EQUAL && opType != OP_TYPE_GREATER_THAN) {
return -1; return -1;
} }
return 0; return 0;
......
...@@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { ...@@ -374,7 +374,11 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
} else { } else {
tError("fail to dispatch conn to work thread"); tError("fail to dispatch conn to work thread");
} }
uv_close((uv_handle_t*)req->data, uvFreeCb); if (!uv_is_closing((uv_handle_t*)req->data)) {
uv_close((uv_handle_t*)req->data, uvFreeCb);
} else {
taosMemoryFree(req->data);
}
taosMemoryFree(req); taosMemoryFree(req);
} }
...@@ -651,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { ...@@ -651,12 +655,14 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_tcp_init(pObj->loop, cli); uv_tcp_init(pObj->loop, cli);
if (uv_accept(stream, (uv_stream_t*)cli) == 0) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
#ifdef WINDOWS
if (pObj->numOfWorkerReady < pObj->numOfThreads) { if (pObj->numOfWorkerReady < pObj->numOfThreads) {
tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads, tError("worker-threads are not ready for all, need %d instead of %d.", pObj->numOfThreads,
pObj->numOfWorkerReady); pObj->numOfWorkerReady);
uv_close((uv_handle_t*)cli, NULL); uv_close((uv_handle_t*)cli, NULL);
return; return;
} }
#endif
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
wr->data = cli; wr->data = cli;
...@@ -668,7 +674,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { ...@@ -668,7 +674,11 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
} else { } else {
uv_close((uv_handle_t*)cli, NULL); if (!uv_is_closing((uv_handle_t*)cli)) {
uv_close((uv_handle_t*)cli, NULL);
} else {
taosMemoryFree(cli);
}
} }
} }
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
...@@ -681,7 +691,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { ...@@ -681,7 +691,6 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
tWarn("failed to create connect:%p", q); tWarn("failed to create connect:%p", q);
taosMemoryFree(buf->base); taosMemoryFree(buf->base);
uv_close((uv_handle_t*)q, NULL); uv_close((uv_handle_t*)q, NULL);
// taosMemoryFree(q);
return; return;
} }
// free memory allocated by // free memory allocated by
...@@ -770,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -770,7 +779,12 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
return false; return false;
} }
#ifdef WINDOWS
uv_pipe_init(pThrd->loop, pThrd->pipe, 1); uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
#else
uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
uv_pipe_open(pThrd->pipe, pThrd->fd);
#endif
pThrd->pipe->data = pThrd; pThrd->pipe->data = pThrd;
...@@ -785,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) { ...@@ -785,8 +799,11 @@ static bool addHandleToWorkloop(SWorkThrd* pThrd, char* pipeName) {
QUEUE_INIT(&pThrd->conn); QUEUE_INIT(&pThrd->conn);
pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb); pThrd->asyncPool = transAsyncPoolCreate(pThrd->loop, 5, pThrd, uvWorkerAsyncCb);
#ifdef WINDOWS
uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb); uv_pipe_connect(&pThrd->connect_req, pThrd->pipe, pipeName, uvOnPipeConnectionCb);
// uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); #else
uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
#endif
return true; return true;
} }
...@@ -958,20 +975,19 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -958,20 +975,19 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
srv->port = port; srv->port = port;
uv_loop_init(srv->loop); uv_loop_init(srv->loop);
char pipeName[PATH_MAX];
#ifdef WINDOWS
int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0); int ret = uv_pipe_init(srv->loop, &srv->pipeListen, 0);
if (ret != 0) { if (ret != 0) {
tError("failed to init pipe, errmsg: %s", uv_err_name(ret)); tError("failed to init pipe, errmsg: %s", uv_err_name(ret));
goto End; goto End;
} }
#ifdef WINDOWS
char pipeName[64];
snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId()); snprintf(pipeName, sizeof(pipeName), "\\\\?\\pipe\\trans.rpc.%d-%" PRIu64, taosSafeRand(), GetCurrentProcessId());
#else // char pipeName[PATH_MAX] = {0};
char pipeName[PATH_MAX] = {0}; // snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(),
snprintf(pipeName, sizeof(pipeName), "%s%spipe.trans.rpc.%08d-%" PRIu64, tsTempDir, TD_DIRSEP, taosSafeRand(), // taosGetSelfPthreadId());
taosGetSelfPthreadId());
#endif
ret = uv_pipe_bind(&srv->pipeListen, pipeName); ret = uv_pipe_bind(&srv->pipeListen, pipeName);
if (ret != 0) { if (ret != 0) {
tError("failed to bind pipe, errmsg: %s", uv_err_name(ret)); tError("failed to bind pipe, errmsg: %s", uv_err_name(ret));
...@@ -997,6 +1013,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -997,6 +1013,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
if (false == addHandleToWorkloop(thrd, pipeName)) { if (false == addHandleToWorkloop(thrd, pipeName)) {
goto End; goto End;
} }
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd)); int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
if (err == 0) { if (err == 0) {
tDebug("success to create worker-thread:%d", i); tDebug("success to create worker-thread:%d", i);
...@@ -1006,14 +1023,54 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -1006,14 +1023,54 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End; goto End;
} }
} }
#else
for (int i = 0; i < srv->numOfThreads; i++) {
SWorkThrd* thrd = (SWorkThrd*)taosMemoryCalloc(1, sizeof(SWorkThrd));
thrd->pTransInst = shandle;
thrd->quit = false;
thrd->pTransInst = shandle;
srv->pipe[i] = (uv_pipe_t*)taosMemoryCalloc(2, sizeof(uv_pipe_t));
srv->pThreadObj[i] = thrd;
uv_os_sock_t fds[2];
if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
goto End;
}
uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
uv_pipe_open(&(srv->pipe[i][0]), fds[1]);
thrd->pipe = &(srv->pipe[i][1]); // init read
thrd->fd = fds[0];
if (false == addHandleToWorkloop(thrd, pipeName)) {
goto End;
}
int err = taosThreadCreate(&(thrd->thread), NULL, transWorkerThread, (void*)(thrd));
if (err == 0) {
tDebug("success to create worker-thread:%d", i);
} else {
// TODO: clear all other resource later
tError("failed to create worker-thread:%d", i);
goto End;
}
}
#endif
if (false == taosValidIpAndPort(srv->ip, srv->port)) { if (false == taosValidIpAndPort(srv->ip, srv->port)) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr()); tError("invalid ip/port, %d:%d, reason:%s", srv->ip, srv->port, terrstr());
goto End; goto End;
} }
if (false == addHandleToAcceptloop(srv)) { if (false == addHandleToAcceptloop(srv)) {
goto End; goto End;
} }
int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv); int err = taosThreadCreate(&srv->thread, NULL, transAcceptThread, (void*)srv);
if (err == 0) { if (err == 0) {
tDebug("success to create accept-thread"); tDebug("success to create accept-thread");
...@@ -1022,6 +1079,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, ...@@ -1022,6 +1079,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
goto End; goto End;
// clear all resource later // clear all resource later
} }
srv->inited = true; srv->inited = true;
return srv; return srv;
End: End:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册