提交 0776427f 编写于 作者: S slzhou

runUdf passed

上级 f839f1de
......@@ -68,17 +68,19 @@ typedef struct SUdfColumnData {
int32_t numOfRows;
bool varLengthColumn;
union {
int32_t nullBitmapLen;
char* nullBitmap;
int32_t dataLen;
char* data;
};
union {
int32_t varOffsetsLen;
char* varOffsets;
int32_t payloadLen;
char* payload;
struct {
int32_t nullBitmapLen;
char *nullBitmap;
int32_t dataLen;
char *data;
} fixLenCol;
struct {
int32_t varOffsetsLen;
char *varOffsets;
int32_t payloadLen;
char *payload;
} varLenCol;
};
} SUdfColumnData;
......@@ -136,7 +138,7 @@ typedef int32_t (*TUdfTeardownFunc)();
//typedef int32_t addFixedLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t colBytes, char* data);
//typedef int32_t addVariableLengthColumnData(SColumnData *columnData, int rowIndex, bool isNull, int32_t dataLen, char * data);
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* columnData);
typedef int32_t (*TUdfFreeUdfColumnFunc)(SUdfColumn* column);
typedef int32_t (*TUdfScalarProcFunc)(SUdfDataBlock block, SUdfColumn *resultCol);
typedef int32_t (*TUdfAggInitFunc)(SUdfInterBuf *buf);
......
......@@ -361,7 +361,7 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) {
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_FIN:
len += tEncodeDataBlock(buf, &callRsp->resultData);
len += encodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
}
return len;
......@@ -383,7 +383,7 @@ void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) {
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
case TSDB_UDF_CALL_AGG_FIN:
buf = tDecodeDataBlock(buf, &callRsp->resultData);
buf = decodeUdfInterBuf(buf, &callRsp->resultBuf);
break;
}
return (void*)buf;
......@@ -406,6 +406,13 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
*buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen));
}
if (buf == NULL) {
len += sizeof(rsp->seqNum);
} else {
*(int64_t*)(*buf) = rsp->seqNum;
*buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum));
}
len += taosEncodeFixedI64(buf, rsp->seqNum);
len += taosEncodeFixedI8(buf, rsp->type);
len += taosEncodeFixedI32(buf, rsp->code);
......@@ -430,6 +437,8 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) {
void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
rsp->msgLen = *(int32_t*)(buf);
buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen));
rsp->seqNum = *(int64_t*)(buf);
buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum));
buf = taosDecodeFixedI64(buf, &rsp->seqNum);
buf = taosDecodeFixedI8(buf, &rsp->type);
buf = taosDecodeFixedI32(buf, &rsp->code);
......@@ -453,15 +462,15 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) {
void freeUdfColumnData(SUdfColumnData *data) {
if (data->varLengthColumn) {
taosMemoryFree(data->varOffsets);
data->varOffsets = NULL;
taosMemoryFree(data->payload);
data->payload = NULL;
taosMemoryFree(data->varLenCol.varOffsets);
data->varLenCol.varOffsets = NULL;
taosMemoryFree(data->varLenCol.payload);
data->varLenCol.payload = NULL;
} else {
taosMemoryFree(data->nullBitmap);
data->nullBitmap = NULL;
taosMemoryFree(data->data);
data->data = NULL;
taosMemoryFree(data->fixLenCol.nullBitmap);
data->fixLenCol.nullBitmap = NULL;
taosMemoryFree(data->fixLenCol.data);
data->fixLenCol.data = NULL;
}
}
......@@ -488,9 +497,9 @@ void freeUdfInterBuf(SUdfInterBuf *buf) {
int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) {
udfBlock->numOfRows = block->info.rows;
udfBlock->numOfCols = block->info.numOfCols;
udfBlock->udfCols = taosMemoryMalloc(sizeof(SUdfColumn*) * udfBlock->numOfCols);
udfBlock->udfCols = taosMemoryCalloc(udfBlock->numOfCols, sizeof(SUdfColumn*));
for (int32_t i = 0; i < udfBlock->numOfCols; ++i) {
udfBlock->udfCols[i] = taosMemoryMalloc(sizeof(SUdfColumn));
udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn));
SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i);
SUdfColumn *udfCol = udfBlock->udfCols[i];
udfCol->colMeta.type = col->info.type;
......@@ -500,19 +509,23 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo
udfCol->colData.numOfRows = udfBlock->numOfRows;
udfCol->colData.varLengthColumn = IS_VAR_DATA_TYPE(udfCol->colMeta.type);
if (udfCol->colData.varLengthColumn) {
udfCol->colData.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varOffsets = taosMemoryMalloc(udfCol->colData.varOffsetsLen);
memcpy(udfCol->colData.varOffsets, col->varmeta.offset, udfCol->colData.varOffsetsLen);
udfCol->colData.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.payload = taosMemoryMalloc(udfCol->colData.payloadLen);
memcpy(udfCol->colData.payload, col->pData, udfCol->colData.payloadLen);
udfCol->colData.varLenCol.varOffsetsLen = sizeof(int32_t) * udfBlock->numOfRows;
udfCol->colData.varLenCol.varOffsets = taosMemoryMalloc(udfCol->colData.varLenCol.varOffsetsLen);
memcpy(udfCol->colData.varLenCol.varOffsets, col->varmeta.offset, udfCol->colData.varLenCol.varOffsetsLen);
udfCol->colData.varLenCol.payloadLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.varLenCol.payload = taosMemoryMalloc(udfCol->colData.varLenCol.payloadLen);
memcpy(udfCol->colData.varLenCol.payload, col->pData, udfCol->colData.varLenCol.payloadLen);
} else {
udfCol->colData.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
udfCol->colData.nullBitmap = taosMemoryMalloc(udfCol->colData.nullBitmapLen);
memcpy(udfCol->colData.nullBitmap, col->nullbitmap, udfCol->colData.nullBitmapLen);
udfCol->colData.dataLen = colDataGetLength(col, udfBlock->numOfRows);
udfCol->colData.data = taosMemoryMalloc(udfCol->colData.dataLen);
memcpy(udfCol->colData.data, col->pData, udfCol->colData.dataLen);
udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows);
int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen;
udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen);
char* bitmap = udfCol->colData.fixLenCol.nullBitmap;
memcpy(bitmap, col->nullbitmap, bitmapLen);
udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows);
int32_t dataLen = udfCol->colData.fixLenCol.dataLen;
udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen);
char* data = udfCol->colData.fixLenCol.data;
memcpy(data, col->pData, dataLen);
}
}
return 0;
......@@ -534,15 +547,15 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
SUdfColumnData *data = &udfCol->colData;
if (!IS_VAR_DATA_TYPE(meta->type)) {
col->nullbitmap = taosMemoryMalloc(data->nullBitmapLen);
memcpy(col->nullbitmap, data->nullBitmap, data->nullBitmapLen);
col->pData = taosMemoryMalloc(data->dataLen);
memcpy(col->pData, data->payload, data->dataLen);
col->nullbitmap = taosMemoryMalloc(data->fixLenCol.nullBitmapLen);
memcpy(col->nullbitmap, data->fixLenCol.nullBitmap, data->fixLenCol.nullBitmapLen);
col->pData = taosMemoryMalloc(data->fixLenCol.dataLen);
memcpy(col->pData, data->fixLenCol.data, data->fixLenCol.dataLen);
} else {
col->varmeta.offset = taosMemoryMalloc(data->varOffsetsLen);
memcpy(col->varmeta.offset, data->varOffsets, data->varOffsetsLen);
col->pData = taosMemoryMalloc(data->payloadLen);
memcpy(col->pData, data->payload, data->payloadLen);
col->varmeta.offset = taosMemoryMalloc(data->varLenCol.varOffsetsLen);
memcpy(col->varmeta.offset, data->varLenCol.varOffsets, data->varLenCol.varOffsetsLen);
col->pData = taosMemoryMalloc(data->varLenCol.payloadLen);
memcpy(col->pData, data->varLenCol.payload, data->varLenCol.payloadLen);
}
return 0;
}
......@@ -932,7 +945,7 @@ void onUdfdExit(uv_process_t *req, int64_t exit_status, int term_signal) {
gUdfcState = UDFC_STATE_RESTARTING;
//TODO: asynchronous without blocking. how to do it
cleanUpUvTasks();
//startUdfd();
startUdfd();
}
}
......@@ -967,7 +980,7 @@ void constructUdfService(void *argsThread) {
uv_loop_init(&gUdfdLoop);
//TODO spawn error
//startUdfd();
startUdfd();
uv_async_init(&gUdfdLoop, &gUdfLoopTaskAync, udfClientAsyncCb);
uv_async_init(&gUdfdLoop, &gUdfLoopStopAsync, udfStopAsyncCb);
......@@ -1053,6 +1066,7 @@ int32_t callUdf(UdfHandle handle, int8_t callType, SSDataBlock *input, SUdfInter
SUdfCallRequest *req = &task->_call.req;
req->udfHandle = task->session->severHandle;
req->callType = callType;
switch (callType) {
case TSDB_UDF_CALL_AGG_INIT: {
......
......@@ -84,9 +84,10 @@ void udfdProcessRequest(uv_work_t *req) {
//TODO error, multi-thread, same udf, lock it
//TODO find all functions normal, init, destroy, normal, merge, finalize
uv_dlsym(&udf->lib, normalFuncName, (void **) (&udf->scalarProcFunc));
char freeFuncName[TSDB_FUNC_NAME_LEN + 5];
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *freeSuffix = "_free";
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
strcat(freeFuncName, "_free");
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
SUdfHandle *handle = taosMemoryMalloc(sizeof(SUdfHandle));
......@@ -118,7 +119,7 @@ void udfdProcessRequest(uv_work_t *req) {
SUdfDataBlock input = {0};
convertDataBlockToUdfDataBlock(&call->block, &input);
SUdfColumn output;
SUdfColumn output = {0};
//TODO: call different functions according to call type, for now just calar
if (call->callType == TSDB_UDF_CALL_SCALA_PROC) {
udf->scalarProcFunc(input, &output);
......@@ -131,6 +132,7 @@ void udfdProcessRequest(uv_work_t *req) {
rsp->type = request.type;
rsp->code = 0;
SUdfCallResponse *subRsp = &rsp->callRsp;
subRsp->callType = call->callType;
convertUdfColumnToDataBlock(&output, &subRsp->resultData);
}
......
......@@ -30,7 +30,7 @@ int main(int argc, char *argv[]) {
pBlock->info.numOfCols = 1;
pBlock->info.rows = 4;
char data[16] = {0};
char bitmap[1] = {0};
char bitmap[4] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
......@@ -47,6 +47,10 @@ int main(int argc, char *argv[]) {
SSDataBlock output = {0};
callUdfScalaProcess(handle, pBlock, &output);
SColumnInfoData *col = taosArrayGet(output.pDataBlock, 0);
for (int32_t i = 0; i < output.info.rows; ++i) {
fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t)));
}
teardownUdf(handle);
stopUdfService();
......
......@@ -17,44 +17,53 @@ int32_t udf1_teardown() {
return 0;
}
int32_t udf1(SUdfDataBlock block, SUdfColumnData *resultData) {
int32_t udf1(SUdfDataBlock block, SUdfColumn *resultCol) {
SUdfColumnData *resultData = &resultCol->colData;
resultData->numOfRows = block.numOfRows;
SUdfColumnData *srcData = &block.udfCols[0]->colData;
resultData->varLengthColumn = srcData->varLengthColumn;
if (resultData->varLengthColumn) {
resultData->varOffsetsLen = srcData->varOffsetsLen;
resultData->varOffsets = malloc(resultData->varOffsetsLen);
memcpy(resultData->varOffsets, srcData->varOffsets, srcData->varOffsetsLen);
resultData->varLenCol.varOffsetsLen = srcData->varLenCol.varOffsetsLen;
resultData->varLenCol.varOffsets = malloc(resultData->varLenCol.varOffsetsLen);
memcpy(resultData->varLenCol.varOffsets, srcData->varLenCol.varOffsets, srcData->varLenCol.varOffsetsLen);
resultData->payloadLen = srcData->payloadLen;
resultData->payload = malloc(resultData->payloadLen);
memcpy(resultData->payload, srcData->payload, srcData->payloadLen);
resultData->varLenCol.payloadLen = srcData->varLenCol.payloadLen;
resultData->varLenCol.payload = malloc(resultData->varLenCol.payloadLen);
memcpy(resultData->varLenCol.payload, srcData->varLenCol.payload, srcData->varLenCol.payloadLen);
} else {
resultData->nullBitmapLen = srcData->nullBitmapLen;
resultData->nullBitmap = malloc(resultData->nullBitmapLen);
memcpy(resultData->nullBitmap, srcData->nullBitmap, srcData->nullBitmapLen);
resultData->fixLenCol.nullBitmapLen = srcData->fixLenCol.nullBitmapLen;
resultData->fixLenCol.nullBitmap = malloc(resultData->fixLenCol.nullBitmapLen);
memcpy(resultData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmap, srcData->fixLenCol.nullBitmapLen);
resultData->dataLen = srcData->dataLen;
resultData->data = malloc(resultData->dataLen);
memcpy(resultData->data, srcData->data, srcData->dataLen);
resultData->fixLenCol.dataLen = srcData->fixLenCol.dataLen;
resultData->fixLenCol.data = malloc(resultData->fixLenCol.dataLen);
memcpy(resultData->fixLenCol.data, srcData->fixLenCol.data, srcData->fixLenCol.dataLen);
for (int32_t i = 0; i < resultData->numOfRows; ++i) {
*(resultData->fixLenCol.data + i * sizeof(int32_t)) = 88;
}
}
SUdfColumnMeta *meta = &resultCol->colMeta;
meta->bytes = 4;
meta->type = TSDB_DATA_TYPE_INT;
meta->scale = 0;
meta->precision = 0;
return 0;
}
int32_t udf1_free(SUdfColumnData *data) {
int32_t udf1_free(SUdfColumn *col) {
SUdfColumnData *data = &col->colData;
if (data->varLengthColumn) {
free(data->varOffsets);
data->varOffsets = NULL;
free(data->payload);
data->payload = NULL;
free(data->varLenCol.varOffsets);
data->varLenCol.varOffsets = NULL;
free(data->varLenCol.payload);
data->varLenCol.payload = NULL;
} else {
free(data->nullBitmap);
data->nullBitmap = NULL;
free(data->data);
data->data = NULL;
free(data->fixLenCol.nullBitmap);
data->fixLenCol.nullBitmap = NULL;
free(data->fixLenCol.data);
data->fixLenCol.data = NULL;
}
return 0;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册