提交 b282e39b 编写于 作者: haoranc's avatar haoranc

Merge branch '3.0' into fix/TD-19312

...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
# taosws-rs # taosws-rs
ExternalProject_Add(taosws-rs ExternalProject_Add(taosws-rs
GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git GIT_REPOSITORY https://github.com/taosdata/taos-connector-rust.git
GIT_TAG 1bdfca3 GIT_TAG 0373a70
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taosws-rs"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE
......
...@@ -339,10 +339,11 @@ typedef struct SUdfcProxy { ...@@ -339,10 +339,11 @@ typedef struct SUdfcProxy {
uv_mutex_t udfStubsMutex; uv_mutex_t udfStubsMutex;
SArray *udfStubs; // SUdfcFuncStub SArray *udfStubs; // SUdfcFuncStub
uv_mutex_t udfcUvMutex;
int8_t initialized; int8_t initialized;
} SUdfcProxy; } SUdfcProxy;
SUdfcProxy gUdfdProxy = {0}; SUdfcProxy gUdfcProxy = {0};
typedef struct SUdfcUvSession { typedef struct SUdfcUvSession {
SUdfcProxy *udfc; SUdfcProxy *udfc;
...@@ -896,23 +897,23 @@ int compareUdfcFuncSub(const void *elem1, const void *elem2) { ...@@ -896,23 +897,23 @@ int compareUdfcFuncSub(const void *elem1, const void *elem2) {
int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
int32_t code = 0; int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex); uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
SUdfcFuncStub key = {0}; SUdfcFuncStub key = {0};
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); int32_t stubIndex = taosArraySearchIdx(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (stubIndex != -1) { if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex); SUdfcFuncStub *foundStub = taosArrayGet(gUdfcProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle; UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle; *pHandle = foundStub->handle;
++foundStub->refCount; ++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs(); foundStub->lastRefTime = taosGetTimestampUs();
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0; return 0;
} else { } else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName, fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName,
foundStub->refCount, foundStub->lastRefTime); foundStub->refCount, foundStub->lastRefTime);
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex); taosArrayRemove(gUdfcProxy.udfStubs, stubIndex);
} }
} }
*pHandle = NULL; *pHandle = NULL;
...@@ -923,46 +924,46 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { ...@@ -923,46 +924,46 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
stub.handle = *pHandle; stub.handle = *pHandle;
++stub.refCount; ++stub.refCount;
stub.lastRefTime = taosGetTimestampUs(); stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub); taosArrayPush(gUdfcProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); taosArraySort(gUdfcProxy.udfStubs, compareUdfcFuncSub);
} else { } else {
*pHandle = NULL; *pHandle = NULL;
} }
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return code; return code;
} }
void releaseUdfFuncHandle(char *udfName) { void releaseUdfFuncHandle(char *udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex); uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
SUdfcFuncStub key = {0}; SUdfcFuncStub key = {0};
strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN); strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); SUdfcFuncStub *foundStub = taosArraySearch(gUdfcProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) { if (!foundStub) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return; return;
} }
if (foundStub->refCount > 0) { if (foundStub->refCount > 0) {
--foundStub->refCount; --foundStub->refCount;
} }
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
} }
int32_t cleanUpUdfs() { int32_t cleanUpUdfs() {
int8_t initialized = atomic_load_8(&gUdfdProxy.initialized); int8_t initialized = atomic_load_8(&gUdfcProxy.initialized);
if (!initialized) { if (!initialized) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
uv_mutex_lock(&gUdfdProxy.udfStubsMutex); uv_mutex_lock(&gUdfcProxy.udfStubsMutex);
if (gUdfdProxy.udfStubs == NULL || taosArrayGetSize(gUdfdProxy.udfStubs) == 0) { if (gUdfcProxy.udfStubs == NULL || taosArrayGetSize(gUdfcProxy.udfStubs) == 0) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
int32_t i = 0; int32_t i = 0;
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { while (i < taosArrayGetSize(gUdfcProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); SUdfcFuncStub *stub = taosArrayGet(gUdfcProxy.udfStubs, i);
if (stub->refCount == 0) { if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
...@@ -979,9 +980,9 @@ int32_t cleanUpUdfs() { ...@@ -979,9 +980,9 @@ int32_t cleanUpUdfs() {
} }
++i; ++i;
} }
taosArrayDestroy(gUdfdProxy.udfStubs); taosArrayDestroy(gUdfcProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs; gUdfcProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfcProxy.udfStubsMutex);
return 0; return 0;
} }
...@@ -1157,9 +1158,11 @@ void onUdfcPipeClose(uv_handle_t *handle) { ...@@ -1157,9 +1158,11 @@ void onUdfcPipeClose(uv_handle_t *handle) {
QUEUE_REMOVE(&task->procTaskQueue); QUEUE_REMOVE(&task->procTaskQueue);
uv_sem_post(&task->taskSem); uv_sem_post(&task->taskSem);
} }
uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
if (conn->session != NULL) { if (conn->session != NULL) {
conn->session->udfUvPipe = NULL; conn->session->udfUvPipe = NULL;
} }
uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
taosMemoryFree(conn->readBuf.buf); taosMemoryFree(conn->readBuf.buf);
taosMemoryFree(conn); taosMemoryFree(conn);
taosMemoryFree((uv_pipe_t *)handle); taosMemoryFree((uv_pipe_t *)handle);
...@@ -1553,11 +1556,11 @@ void constructUdfService(void *argsThread) { ...@@ -1553,11 +1556,11 @@ void constructUdfService(void *argsThread) {
} }
int32_t udfcOpen() { int32_t udfcOpen() {
int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 0, 1); int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 0, 1);
if (old == 1) { if (old == 1) {
return 0; return 0;
} }
SUdfcProxy *proxy = &gUdfdProxy; SUdfcProxy *proxy = &gUdfcProxy;
getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName)); getUdfdPipeName(proxy->udfdPipeName, sizeof(proxy->udfdPipeName));
proxy->udfcState = UDFC_STATE_STARTNG; proxy->udfcState = UDFC_STATE_STARTNG;
uv_barrier_init(&proxy->initBarrier, 2); uv_barrier_init(&proxy->initBarrier, 2);
...@@ -1567,16 +1570,17 @@ int32_t udfcOpen() { ...@@ -1567,16 +1570,17 @@ int32_t udfcOpen() {
uv_barrier_wait(&proxy->initBarrier); uv_barrier_wait(&proxy->initBarrier);
uv_mutex_init(&proxy->udfStubsMutex); uv_mutex_init(&proxy->udfStubsMutex);
proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub));
uv_mutex_init(&proxy->udfcUvMutex);
fnInfo("udfc initialized") return 0; fnInfo("udfc initialized") return 0;
} }
int32_t udfcClose() { int32_t udfcClose() {
int8_t old = atomic_val_compare_exchange_8(&gUdfdProxy.initialized, 1, 0); int8_t old = atomic_val_compare_exchange_8(&gUdfcProxy.initialized, 1, 0);
if (old == 0) { if (old == 0) {
return 0; return 0;
} }
SUdfcProxy *udfc = &gUdfdProxy; SUdfcProxy *udfc = &gUdfcProxy;
udfc->udfcState = UDFC_STATE_STOPPING; udfc->udfcState = UDFC_STATE_STOPPING;
uv_async_send(&udfc->loopStopAsync); uv_async_send(&udfc->loopStopAsync);
uv_thread_join(&udfc->loopThread); uv_thread_join(&udfc->loopThread);
...@@ -1584,6 +1588,7 @@ int32_t udfcClose() { ...@@ -1584,6 +1588,7 @@ int32_t udfcClose() {
uv_barrier_destroy(&udfc->initBarrier); uv_barrier_destroy(&udfc->initBarrier);
taosArrayDestroy(udfc->udfStubs); taosArrayDestroy(udfc->udfStubs);
uv_mutex_destroy(&udfc->udfStubsMutex); uv_mutex_destroy(&udfc->udfStubsMutex);
uv_mutex_destroy(&udfc->udfcUvMutex);
udfc->udfcState = UDFC_STATE_INITAL; udfc->udfcState = UDFC_STATE_INITAL;
fnInfo("udfc is cleaned up"); fnInfo("udfc is cleaned up");
return 0; return 0;
...@@ -1611,13 +1616,13 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) { ...@@ -1611,13 +1616,13 @@ int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType) {
} }
int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
if (gUdfdProxy.udfcState != UDFC_STATE_READY) { if (gUdfcProxy.udfcState != UDFC_STATE_READY) {
return TSDB_CODE_UDF_INVALID_STATE; return TSDB_CODE_UDF_INVALID_STATE;
} }
SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask));
task->errCode = 0; task->errCode = 0;
task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession));
task->session->udfc = &gUdfdProxy; task->session->udfc = &gUdfcProxy;
task->type = UDF_TASK_SETUP; task->type = UDF_TASK_SETUP;
SUdfSetupRequest *req = &task->_setup.req; SUdfSetupRequest *req = &task->_setup.req;
...@@ -1625,7 +1630,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { ...@@ -1625,7 +1630,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT); int32_t errCode = udfcRunUdfUvTask(task, UV_TASK_CONNECT);
if (errCode != 0) { if (errCode != 0) {
fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfdProxy)->udfdPipeName); fnError("failed to connect to pipe. udfName: %s, pipe: %s", udfName, (&gUdfcProxy)->udfdPipeName);
taosMemoryFree(task->session); taosMemoryFree(task->session);
taosMemoryFree(task); taosMemoryFree(task);
return TSDB_CODE_UDF_PIPE_CONNECT_ERR; return TSDB_CODE_UDF_PIPE_CONNECT_ERR;
...@@ -1799,10 +1804,12 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { ...@@ -1799,10 +1804,12 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) {
fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle);
// TODO: synchronization refactor between libuv event loop and request thread // TODO: synchronization refactor between libuv event loop and request thread
uv_mutex_lock(&gUdfcProxy.udfcUvMutex);
if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) { if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) {
SClientUvConn *conn = session->udfUvPipe->data; SClientUvConn *conn = session->udfUvPipe->data;
conn->session = NULL; conn->session = NULL;
} }
uv_mutex_unlock(&gUdfcProxy.udfcUvMutex);
taosMemoryFree(session); taosMemoryFree(session);
taosMemoryFree(task); taosMemoryFree(task);
......
...@@ -424,7 +424,12 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) { ...@@ -424,7 +424,12 @@ static int32_t walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return -1; return -1;
} }
ASSERT(taosLSeekFile(pWal->pIdxFile, 0, SEEK_END) == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned"); // check alignment of idx entries
int64_t endOffset = taosLSeekFile(pWal->pIdxFile, 0, SEEK_END);
if (endOffset < 0) {
wFatal("vgId:%d, failed to seek end of idxfile due to %s. ver:%" PRId64 "", pWal->cfg.vgId, strerror(errno), ver);
}
ASSERT(endOffset == idxOffset + sizeof(SWalIdxEntry) && "Offset of idx entries misaligned");
return 0; return 0;
} }
......
因为 它太大了无法显示 source diff 。你可以改为 查看blob
...@@ -217,10 +217,11 @@ python3 ./test.py -f 0-others/compatibility.py ...@@ -217,10 +217,11 @@ python3 ./test.py -f 0-others/compatibility.py
# python3 ./test.py -f 2-query/json_tag.py # python3 ./test.py -f 2-query/json_tag.py
# # python3 ./test.py -f 2-query/nestedQuery.py # python3 ./test.py -f 2-query/nestedQuery.py
# # TD-15983 subquery output duplicate name column. # TD-15983 subquery output duplicate name column.
# # Please Xiangyang Guo modify the following script # Please Xiangyang Guo modify the following script
# # python3 ./test.py -f 2-query/nestedQuery_str.py # python3 ./test.py -f 2-query/nestedQuery_str.py
# python3 ./test.py -f 2-query/stablity.py
# python3 ./test.py -f 2-query/elapsed.py # python3 ./test.py -f 2-query/elapsed.py
# python3 ./test.py -f 2-query/csum.py # python3 ./test.py -f 2-query/csum.py
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册