diff --git a/docs/en/07-develop/09-udf.md b/docs/en/07-develop/09-udf.md index b0124b9c3fb93bfaa30bdba6a344acda0a2c7c3f..5a98ff2789037693e479c12129b530d97cc0ad3e 100644 --- a/docs/en/07-develop/09-udf.md +++ b/docs/en/07-develop/09-udf.md @@ -10,7 +10,7 @@ User-defined functions can be scalar functions or aggregate functions. Scalar fu TDengine supports user-defined functions written in C or Python. This document describes the usage of user-defined functions. -## Implement a UDF in C +## Implement a UDF in C When you create a user-defined function, you must implement standard interface functions: - For scalar functions, implement the `scalarfn` interface function. @@ -111,13 +111,13 @@ Interface functions return a value that indicates whether the operation was succ For information about the parameters for interface functions, see Data Model #### Scalar Interface - `int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)` - + `int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)` + Replace `scalarfn` with the name of your function. This function performs scalar calculations on data blocks. You can configure a value through the parameters in the `resultColumn` structure. The parameters in the function are defined as follows: - inputDataBlock: The data block to input. - - resultColumn: The column to output. The column to output. + - resultColumn: The column to output. The column to output. #### Aggregate Interface @@ -197,7 +197,7 @@ The data structure is described as follows: - The SUdfDataBlock block includes the number of rows (numOfRows) and the number of columns (numCols). udfCols[i] (0 <= i <= numCols-1) indicates that each column is of type SUdfColumn. - SUdfColumn includes the definition of the data type of the column (colMeta) and the data in the column (colData). - The member definitions of SUdfColumnMeta are the same as the data type definitions in `taos.h`. -- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data. +- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data. - SUdfInterBuf defines the intermediate structure `buffer` and the number of results in the buffer `numOfResult`. Additional functions are defined in `taosudf.h` to make it easier to work with these structures. @@ -285,9 +285,9 @@ def init(): def destroy(): # destroy def process(input: datablock) -> tuple[output_type]: - # process input datablock, + # process input datablock, # datablock.data(row, col) is to access the python object in location(row,col) - # return tuple object consisted of object of type outputtype + # return tuple object consisted of object of type outputtype ``` ### Implement an Aggregate UDF in Python @@ -303,12 +303,12 @@ def start() -> bytes: #return serialize(init_state) def reduce(inputs: datablock, buf: bytes) -> bytes # deserialize buf to state - # reduce the inputs and state into new_state. - # use inputs.data(i,j) to access python ojbect of location(i,j) + # reduce the inputs and state into new_state. + # use inputs.data(i,j) to access python object of location(i,j) # serialize new_state into new_state_bytes - return new_state_bytes + return new_state_bytes def finish(buf: bytes) -> output_type: - #return obj of type outputtype + #return obj of type outputtype ``` ### Python UDF Interface Definition @@ -328,7 +328,7 @@ def finish(buf: bytes) -> output_type: ``` - first `start()` is called to return the initial result in type `bytes` -- then the input data are divided into multiple data blocks and for each block `input`, `reduce` is called with the data block `input` and the current result `buf` bytes and generates a new intermediate result buffer. +- then the input data are divided into multiple data blocks and for each block `input`, `reduce` is called with the data block `input` and the current result `buf` bytes and generates a new intermediate result buffer. - finally, the `finish` function is called on the intermediate result `buf` and outputs 0 or 1 data of type `output_type` @@ -337,7 +337,7 @@ def finish(buf: bytes) -> output_type: def init() def destroy() ``` -Implement `init` for initialization and `destroy` for termination. +Implement `init` for initialization and `destroy` for termination. ### Data Mapping between TDengine SQL and Python UDF @@ -360,7 +360,7 @@ sudo pip install taospyudf ldconfig ``` 2. If PYTHONPATH is needed to find Python packages when the Python UDF executes, include the PYTHONPATH contents into the udfdLdLibPath variable of the taos.cfg configuration file - + ### Python UDF Sample Code #### Scalar Function [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py) diff --git a/docs/en/14-reference/12-config/index.md b/docs/en/14-reference/12-config/index.md index 3ce63fb6cceded0f309691ed47ad8bda38db5bfa..52ded6208a5f946dd0901410605907c826813226 100644 --- a/docs/en/14-reference/12-config/index.md +++ b/docs/en/14-reference/12-config/index.md @@ -111,7 +111,7 @@ The parameters described in this document by the effect that they have on the sy | Attribute | Description | | ------------- | ---------------------------------------------- | | Applicable | Client/Server | -| Meaning | The maximum waiting time to get avaliable conn | +| Meaning | The maximum waiting time to get available conn | | Value Range | 10-50000000(ms) | | Default Value | 500000 | diff --git a/docs/zh/07-develop/09-udf.md b/docs/zh/07-develop/09-udf.md index 92f5d2a857ec3106c53258adda2a1aaaf0b3bb4c..e43275d5e04818eff8ed0eddf76bd7c768b315f5 100644 --- a/docs/zh/07-develop/09-udf.md +++ b/docs/zh/07-develop/09-udf.md @@ -303,7 +303,7 @@ def start() -> bytes: def reduce(inputs: datablock, buf: bytes) -> bytes # deserialize buf to state # reduce the inputs and state into new_state. - # use inputs.data(i,j) to access python ojbect of location(i,j) + # use inputs.data(i,j) to access python object of location(i,j) # serialize new_state into new_state_bytes return new_state_bytes def finish(buf: bytes) -> output_type: diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 35a6838b2e0f36ba36ccd41c4e8dd175e2efe7c9..a08152714408280389b2d76d9dcb5029259109d6 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -149,6 +149,7 @@ struct SWalReader { TdFilePtr pIdxFile; int64_t curFileFirstVer; int64_t curVersion; + int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data int64_t capacity; TdThreadMutex mutex; SWalFilterCond cond; @@ -200,6 +201,8 @@ int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver); int32_t walNextValidMsg(SWalReader *pRead); int64_t walReaderGetCurrentVer(const SWalReader *pReader); int64_t walReaderGetValidFirstVer(const SWalReader *pReader); +int64_t walReaderGetSkipToVersion(SWalReader *pReader); +void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver); void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever); void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bde53dca2d8b6d3b40c1108fd9cafa767e4ec592..77facb0d4272f8e65fc8938cc8e878e06bb7b61c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1304,19 +1304,22 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg; + + int32_t vgId = pTq->pStreamMeta->vgId; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask) { atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus); // no lock needs to secure the access of the version - if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended. - pTask->chkInfo.currentVer = sversion; - walReaderSeekVer(pTask->exec.pWalReader, sversion); - tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { + // discard all the data when the stream task is suspended. + walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion); + tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64 + ", schedStatus:%d", + vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } else { // from the previous paused version and go on - tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId, - pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); + tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", + vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus); } if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) { @@ -1325,6 +1328,8 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamSchedExec(pTask); } streamMetaReleaseTask(pTq->pStreamMeta, pTask); + } else { + tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId); } return 0; @@ -1433,7 +1438,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); if (numOfTasks == 0) { tqInfo("vgId:%d no stream tasks exist", vgId); - taosWUnLockLatch(&pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); return 0; } @@ -1441,7 +1446,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { if (pMeta->walScanCounter > 1) { tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter); - taosWUnLockLatch(&pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); return 0; } @@ -1449,7 +1454,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { if (pRunReq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr()); - taosWUnLockLatch(&pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); return -1; } @@ -1460,7 +1465,7 @@ int32_t tqStartStreamTasks(STQ* pTq) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg); - taosWUnLockLatch(&pTq->pStreamMeta->lock); + taosWUnLockLatch(&pMeta->lock); return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 6e3036a66ba73c60b421ad73a2a6fb7335a3803f..fe80f486918413390ee7916fb97fe07c58a1b80d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -87,6 +87,16 @@ static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { } } + int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader); + if (skipToVer != 0 && skipToVer > pTask->chkInfo.currentVer) { + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer); + if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit + return code; + } + + tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index c29d82bcf394b65beb5dcd94e6d35835e0a7cc60..0eb3d6ef09e8c81b981d69803b9bdb5f335b721c 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -108,6 +108,14 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; } int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); } +void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver) { atomic_store_64(&pReader->skipToVersion, ver); } + +// this function is NOT multi-thread safe, and no need to be. +int64_t walReaderGetSkipToVersion(SWalReader *pReader) { + int64_t newVersion = pReader->skipToVersion; + pReader->skipToVersion = 0; + return newVersion; +} void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) { *sver = walGetFirstVer(pReader->pWal);