提交 9d031f4d 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into enh/tsdb_optimize

......@@ -10,7 +10,7 @@ User-defined functions can be scalar functions or aggregate functions. Scalar fu
TDengine supports user-defined functions written in C or Python. This document describes the usage of user-defined functions.
## Implement a UDF in C
## Implement a UDF in C
When you create a user-defined function, you must implement standard interface functions:
- For scalar functions, implement the `scalarfn` interface function.
......@@ -111,13 +111,13 @@ Interface functions return a value that indicates whether the operation was succ
For information about the parameters for interface functions, see Data Model
#### Scalar Interface
`int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
`int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)`
Replace `scalarfn` with the name of your function. This function performs scalar calculations on data blocks. You can configure a value through the parameters in the `resultColumn` structure.
The parameters in the function are defined as follows:
- inputDataBlock: The data block to input.
- resultColumn: The column to output. The column to output.
- resultColumn: The column to output. The column to output.
#### Aggregate Interface
......@@ -197,7 +197,7 @@ The data structure is described as follows:
- The SUdfDataBlock block includes the number of rows (numOfRows) and the number of columns (numCols). udfCols[i] (0 <= i <= numCols-1) indicates that each column is of type SUdfColumn.
- SUdfColumn includes the definition of the data type of the column (colMeta) and the data in the column (colData).
- The member definitions of SUdfColumnMeta are the same as the data type definitions in `taos.h`.
- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data.
- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data.
- SUdfInterBuf defines the intermediate structure `buffer` and the number of results in the buffer `numOfResult`.
Additional functions are defined in `taosudf.h` to make it easier to work with these structures.
......@@ -285,9 +285,9 @@ def init():
def destroy():
# destroy
def process(input: datablock) -> tuple[output_type]:
# process input datablock,
# process input datablock,
# datablock.data(row, col) is to access the python object in location(row,col)
# return tuple object consisted of object of type outputtype
# return tuple object consisted of object of type outputtype
```
### Implement an Aggregate UDF in Python
......@@ -303,12 +303,12 @@ def start() -> bytes:
#return serialize(init_state)
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python ojbect of location(i,j)
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python object of location(i,j)
# serialize new_state into new_state_bytes
return new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
#return obj of type outputtype
#return obj of type outputtype
```
### Python UDF Interface Definition
......@@ -328,7 +328,7 @@ def finish(buf: bytes) -> output_type:
```
- first `start()` is called to return the initial result in type `bytes`
- then the input data are divided into multiple data blocks and for each block `input`, `reduce` is called with the data block `input` and the current result `buf` bytes and generates a new intermediate result buffer.
- then the input data are divided into multiple data blocks and for each block `input`, `reduce` is called with the data block `input` and the current result `buf` bytes and generates a new intermediate result buffer.
- finally, the `finish` function is called on the intermediate result `buf` and outputs 0 or 1 data of type `output_type`
......@@ -337,7 +337,7 @@ def finish(buf: bytes) -> output_type:
def init()
def destroy()
```
Implement `init` for initialization and `destroy` for termination.
Implement `init` for initialization and `destroy` for termination.
### Data Mapping between TDengine SQL and Python UDF
......@@ -360,7 +360,7 @@ sudo pip install taospyudf
ldconfig
```
2. If PYTHONPATH is needed to find Python packages when the Python UDF executes, include the PYTHONPATH contents into the udfdLdLibPath variable of the taos.cfg configuration file
### Python UDF Sample Code
#### Scalar Function [pybitand](https://github.com/taosdata/TDengine/blob/3.0/tests/script/sh/pybitand.py)
......
......@@ -111,7 +111,7 @@ The parameters described in this document by the effect that they have on the sy
| Attribute | Description |
| ------------- | ---------------------------------------------- |
| Applicable | Client/Server |
| Meaning | The maximum waiting time to get avaliable conn |
| Meaning | The maximum waiting time to get available conn |
| Value Range | 10-50000000(ms) |
| Default Value | 500000 |
......
......@@ -303,7 +303,7 @@ def start() -> bytes:
def reduce(inputs: datablock, buf: bytes) -> bytes
# deserialize buf to state
# reduce the inputs and state into new_state.
# use inputs.data(i,j) to access python ojbect of location(i,j)
# use inputs.data(i,j) to access python object of location(i,j)
# serialize new_state into new_state_bytes
return new_state_bytes
def finish(buf: bytes) -> output_type:
......
......@@ -149,6 +149,7 @@ struct SWalReader {
TdFilePtr pIdxFile;
int64_t curFileFirstVer;
int64_t curVersion;
int64_t skipToVersion; // skip data and jump to destination version, usually used by stream resume ignoring untreated data
int64_t capacity;
TdThreadMutex mutex;
SWalFilterCond cond;
......@@ -200,6 +201,8 @@ int32_t walReaderSeekVer(SWalReader *pRead, int64_t ver);
int32_t walNextValidMsg(SWalReader *pRead);
int64_t walReaderGetCurrentVer(const SWalReader *pReader);
int64_t walReaderGetValidFirstVer(const SWalReader *pReader);
int64_t walReaderGetSkipToVersion(SWalReader *pReader);
void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver);
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever);
void walReaderVerifyOffset(SWalReader *pWalReader, STqOffsetVal* pOffset);
......
......@@ -1304,19 +1304,22 @@ int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
int32_t vgId = pTq->pStreamMeta->vgId;
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId);
if (pTask) {
atomic_store_8(&pTask->status.taskStatus, pTask->status.keepTaskStatus);
// no lock needs to secure the access of the version
if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) { // discard all the data when the stream task is suspended.
pTask->chkInfo.currentVer = sversion;
walReaderSeekVer(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to normal from the latest version:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
if (pReq->igUntreated && pTask->taskLevel == TASK_LEVEL__SOURCE) {
// discard all the data when the stream task is suspended.
walReaderSetSkipToVersion(pTask->exec.pWalReader, sversion);
tqDebug("vgId:%d s-task:%s resume to exec, prev paused version:%" PRId64 ", start from vnode ver:%" PRId64
", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
} else { // from the previous paused version and go on
tqDebug("vgId:%d s-task:%s resume to normal from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d", pTq->pStreamMeta->vgId,
pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
tqDebug("vgId:%d s-task:%s resume to exec, from paused ver:%" PRId64 ", vnode ver:%" PRId64 ", schedStatus:%d",
vgId, pTask->id.idStr, pTask->chkInfo.currentVer, sversion, pTask->status.schedStatus);
}
if (pTask->taskLevel == TASK_LEVEL__SOURCE && taosQueueItemSize(pTask->inputQueue->queue) == 0) {
......@@ -1325,6 +1328,8 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
streamSchedExec(pTask);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
} else {
tqError("vgId:%d failed to find the s-task:0x%x for resume stream task", vgId, pReq->taskId);
}
return 0;
......@@ -1433,7 +1438,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqInfo("vgId:%d no stream tasks exist", vgId);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
......@@ -1441,7 +1446,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
if (pMeta->walScanCounter > 1) {
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->walScanCounter);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
......@@ -1449,7 +1454,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return -1;
}
......@@ -1460,7 +1465,7 @@ int32_t tqStartStreamTasks(STQ* pTq) {
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
taosWUnLockLatch(&pTq->pStreamMeta->lock);
taosWUnLockLatch(&pMeta->lock);
return 0;
}
......@@ -87,6 +87,16 @@ static int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
}
}
int64_t skipToVer = walReaderGetSkipToVersion(pTask->exec.pWalReader);
if (skipToVer != 0 && skipToVer > pTask->chkInfo.currentVer) {
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, skipToVer);
if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit
return code;
}
tqDebug("vgId:%d s-task:%s wal reader jump to ver:%" PRId64, vgId, pTask->id.idStr, skipToVer);
}
return TSDB_CODE_SUCCESS;
}
......
......@@ -108,6 +108,14 @@ int32_t walNextValidMsg(SWalReader *pReader) {
int64_t walReaderGetCurrentVer(const SWalReader *pReader) { return pReader->curVersion; }
int64_t walReaderGetValidFirstVer(const SWalReader *pReader) { return walGetFirstVer(pReader->pWal); }
void walReaderSetSkipToVersion(SWalReader *pReader, int64_t ver) { atomic_store_64(&pReader->skipToVersion, ver); }
// this function is NOT multi-thread safe, and no need to be.
int64_t walReaderGetSkipToVersion(SWalReader *pReader) {
int64_t newVersion = pReader->skipToVersion;
pReader->skipToVersion = 0;
return newVersion;
}
void walReaderValidVersionRange(SWalReader *pReader, int64_t *sver, int64_t *ever) {
*sver = walGetFirstVer(pReader->pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册