From c4b3da50328cdc23e720916c2c6501663348c18c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 13 Oct 2022 14:15:42 +0800 Subject: [PATCH] more code format --- source/libs/function/inc/builtinsimpl.h | 162 +++---- source/libs/function/inc/tfunctionInt.h | 10 +- source/libs/function/inc/thistogram.h | 16 +- source/libs/function/inc/tpercentile.h | 6 +- source/libs/function/inc/tscript.h | 8 +- source/libs/function/inc/tudfInt.h | 34 +- source/libs/function/inc/udfc.h | 78 ++- source/libs/function/src/builtinsimpl.c | 7 +- source/libs/function/src/tfunctionInt.c | 18 +- source/libs/function/src/thistogram.c | 14 +- source/libs/function/src/tpercentile.c | 159 +++--- source/libs/function/src/tscript.c | 4 +- source/libs/function/src/tudf.c | 614 +++++++++++------------- source/libs/function/test/runUdf.c | 13 +- source/libs/function/test/udf1.c | 21 +- source/libs/function/test/udf2.c | 29 +- tools/scripts/codeFormat.sh | 4 +- 17 files changed, 574 insertions(+), 623 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 0880f2f5c7..b5b7453e7a 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -23,37 +23,37 @@ extern "C" { #include "function.h" #include "functionMgt.h" -bool dummyGetEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* UNUSED_PARAM(pEnv)); -bool dummyInit(SqlFunctionCtx* UNUSED_PARAM(pCtx), SResultRowEntryInfo* UNUSED_PARAM(pResultInfo)); +bool dummyGetEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* UNUSED_PARAM(pEnv)); +bool dummyInit(SqlFunctionCtx* UNUSED_PARAM(pCtx), SResultRowEntryInfo* UNUSED_PARAM(pResultInfo)); int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)); int32_t dummyFinalize(SqlFunctionCtx* UNUSED_PARAM(pCtx), SSDataBlock* UNUSED_PARAM(pBlock)); -bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult); int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); EFuncDataRequired countDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); -bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -int32_t countFunction(SqlFunctionCtx *pCtx); -int32_t countInvertFunction(SqlFunctionCtx *pCtx); +bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t countFunction(SqlFunctionCtx* pCtx); +int32_t countInvertFunction(SqlFunctionCtx* pCtx); EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow); -bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -int32_t sumFunction(SqlFunctionCtx *pCtx); -int32_t sumInvertFunction(SqlFunctionCtx *pCtx); -int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t sumFunction(SqlFunctionCtx* pCtx); +int32_t sumInvertFunction(SqlFunctionCtx* pCtx); +int32_t sumCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool minmaxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool minmaxFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t minFunction(SqlFunctionCtx* pCtx); -int32_t maxFunction(SqlFunctionCtx *pCtx); +int32_t maxFunction(SqlFunctionCtx* pCtx); int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t minCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t maxCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool avgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getAvgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t avgFunction(SqlFunctionCtx* pCtx); int32_t avgFunctionMerge(SqlFunctionCtx* pCtx); int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -62,8 +62,8 @@ int32_t avgInvertFunction(SqlFunctionCtx* pCtx); int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getAvgInfoSize(); -bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool stddevFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool stddevFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t stddevFunction(SqlFunctionCtx* pCtx); int32_t stddevFunctionMerge(SqlFunctionCtx* pCtx); int32_t stddevFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -72,63 +72,63 @@ int32_t stddevInvertFunction(SqlFunctionCtx* pCtx); int32_t stddevCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getStddevInfoSize(); -bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool leastSQRFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getLeastSQRFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool leastSQRFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t leastSQRFunction(SqlFunctionCtx* pCtx); int32_t leastSQRFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t leastSQRInvertFunction(SqlFunctionCtx* pCtx); int32_t leastSQRCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t percentileFunction(SqlFunctionCtx *pCtx); +bool getPercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool percentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t percentileFunction(SqlFunctionCtx* pCtx); int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t apercentileFunction(SqlFunctionCtx *pCtx); +bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t apercentileFunction(SqlFunctionCtx* pCtx); int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx); int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t apercentileCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getApercentileMaxSize(); -bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); -int32_t diffFunction(SqlFunctionCtx *pCtx); +bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); +int32_t diffFunction(SqlFunctionCtx* pCtx); -bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool derivativeFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); -int32_t derivativeFunction(SqlFunctionCtx *pCtx); +bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); +int32_t derivativeFunction(SqlFunctionCtx* pCtx); -bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool irateFuncSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo); -int32_t irateFunction(SqlFunctionCtx *pCtx); +bool getIrateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool irateFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo); +int32_t irateFunction(SqlFunctionCtx* pCtx); int32_t irateFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx); -bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -int32_t firstFunction(SqlFunctionCtx *pCtx); -int32_t firstFunctionMerge(SqlFunctionCtx *pCtx); -int32_t lastFunction(SqlFunctionCtx *pCtx); -int32_t lastFunctionMerge(SqlFunctionCtx *pCtx); -int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -int32_t getFirstLastInfoSize(int32_t resBytes); +bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +int32_t firstFunction(SqlFunctionCtx* pCtx); +int32_t firstFunctionMerge(SqlFunctionCtx* pCtx); +int32_t lastFunction(SqlFunctionCtx* pCtx); +int32_t lastFunctionMerge(SqlFunctionCtx* pCtx); +int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t firstLastPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); +int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t lastCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); +int32_t getFirstLastInfoSize(int32_t resBytes); EFuncDataRequired lastDynDataReq(void* pRes, STimeWindow* pTimeWindow); -int32_t lastRowFunction(SqlFunctionCtx *pCtx); +int32_t lastRowFunction(SqlFunctionCtx* pCtx); -bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); -bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); -bool topBotFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t topFunction(SqlFunctionCtx *pCtx); -int32_t topFunctionMerge(SqlFunctionCtx *pCtx); -int32_t bottomFunction(SqlFunctionCtx *pCtx); -int32_t bottomFunctionMerge(SqlFunctionCtx *pCtx); +bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); +bool getTopBotMergeFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); +bool topBotFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t topFunction(SqlFunctionCtx* pCtx); +int32_t topFunctionMerge(SqlFunctionCtx* pCtx); +int32_t bottomFunction(SqlFunctionCtx* pCtx); +int32_t bottomFunctionMerge(SqlFunctionCtx* pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t topBotMergeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -136,8 +136,8 @@ int32_t topCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t bottomCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); int32_t getTopBotInfoSize(int64_t numOfItems); -bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool spreadFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool spreadFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t spreadFunction(SqlFunctionCtx* pCtx); int32_t spreadFunctionMerge(SqlFunctionCtx* pCtx); int32_t spreadFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -145,8 +145,8 @@ int32_t spreadPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t getSpreadInfoSize(); int32_t spreadCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool elapsedFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getElapsedFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool elapsedFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t elapsedFunction(SqlFunctionCtx* pCtx); int32_t elapsedFunctionMerge(SqlFunctionCtx* pCtx); int32_t elapsedFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -154,8 +154,8 @@ int32_t elapsedPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t getElapsedInfoSize(); int32_t elapsedCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool histogramFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getHistogramFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool histogramFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t histogramFunction(SqlFunctionCtx* pCtx); int32_t histogramFunctionPartial(SqlFunctionCtx* pCtx); int32_t histogramFunctionMerge(SqlFunctionCtx* pCtx); @@ -164,7 +164,7 @@ int32_t histogramPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t getHistogramInfoSize(); int32_t histogramCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool getHLLFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t hllFunction(SqlFunctionCtx* pCtx); int32_t hllFunctionMerge(SqlFunctionCtx* pCtx); int32_t hllFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); @@ -172,48 +172,48 @@ int32_t hllPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); int32_t getHLLInfoSize(); int32_t hllCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx); -bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool stateFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getStateFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool stateFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t stateCountFunction(SqlFunctionCtx* pCtx); int32_t stateDurationFunction(SqlFunctionCtx* pCtx); -bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool getCsumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t csumFunction(SqlFunctionCtx* pCtx); -bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool mavgFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getMavgFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool mavgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t mavgFunction(SqlFunctionCtx* pCtx); -bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool sampleFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t sampleFunction(SqlFunctionCtx* pCtx); int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); +bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool tailFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); int32_t tailFunction(SqlFunctionCtx* pCtx); -bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool uniqueFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t uniqueFunction(SqlFunctionCtx *pCtx); +bool getUniqueFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool uniqueFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t uniqueFunction(SqlFunctionCtx* pCtx); -bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool modeFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t modeFunction(SqlFunctionCtx *pCtx); +bool getModeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t modeFunction(SqlFunctionCtx* pCtx); int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool twaFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t twaFunction(SqlFunctionCtx *pCtx); -int32_t twaFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); +bool getTwaFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool twaFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t twaFunction(SqlFunctionCtx* pCtx); +int32_t twaFinalize(struct SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool blockDistSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); -int32_t blockDistFunction(SqlFunctionCtx *pCtx); +bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo); +int32_t blockDistFunction(SqlFunctionCtx* pCtx); int32_t blockDistFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); -bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +bool getGroupKeyFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); int32_t groupKeyFunction(SqlFunctionCtx* pCtx); int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); diff --git a/source/libs/function/inc/tfunctionInt.h b/source/libs/function/inc/tfunctionInt.h index 8f6cbc977e..821544106f 100644 --- a/source/libs/function/inc/tfunctionInt.h +++ b/source/libs/function/inc/tfunctionInt.h @@ -22,11 +22,11 @@ extern "C" { #include "os.h" -#include "tname.h" -#include "taosdef.h" -#include "tvariant.h" #include "function.h" +#include "taosdef.h" +#include "tname.h" #include "tudf.h" +#include "tvariant.h" bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const char *maxval); @@ -37,8 +37,8 @@ bool topbot_datablock_filter(SqlFunctionCtx *pCtx, const char *minval, const cha static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32_t bufLen) { pResInfo->initialized = true; // the this struct has been initialized flag - pResInfo->complete = false; - pResInfo->numOfRes = 0; + pResInfo->complete = false; + pResInfo->numOfRes = 0; memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); } diff --git a/source/libs/function/inc/thistogram.h b/source/libs/function/inc/thistogram.h index cb6560325b..20111086cd 100644 --- a/source/libs/function/inc/thistogram.h +++ b/source/libs/function/inc/thistogram.h @@ -43,15 +43,15 @@ typedef struct SHistogramInfo { int64_t numOfElems; int32_t numOfEntries; int32_t maxEntries; - double min; - double max; + double min; + double max; #if defined(USE_ARRAYLIST) SHistBin* elems; #else - tSkipList* pList; + tSkipList* pList; SMultiwayMergeTreeInfo* pLoserTree; - int32_t maxIndex; - bool ordered; + int32_t maxIndex; + bool ordered; #endif } SHistogramInfo; @@ -61,16 +61,16 @@ SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins); int32_t tHistogramAdd(SHistogramInfo** pHisto, double val); int64_t tHistogramSum(SHistogramInfo* pHisto, double v); -double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num); +double* tHistogramUniform(SHistogramInfo* pHisto, double* ratio, int32_t num); SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2, int32_t numOfEntries); -void tHistogramDestroy(SHistogramInfo** pHisto); +void tHistogramDestroy(SHistogramInfo** pHisto); void tHistogramPrint(SHistogramInfo* pHisto); int32_t histoBinarySearch(SHistBin* pEntry, int32_t len, double val); SHeapEntry* tHeapCreate(int32_t numOfEntries); -void tHeapSort(SHeapEntry* pEntry, int32_t len); +void tHeapSort(SHeapEntry* pEntry, int32_t len); #ifdef __cplusplus } diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 554f9e567f..873dc46a08 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -61,10 +61,10 @@ typedef struct tMemBucket { MinMaxEntry range; // value range int32_t times; // count that has been checked for deciding the correct data value buckets. __compar_fn_t comparFn; - tMemBucketSlot* pSlots; - SDiskbasedBuf* pBuffer; + tMemBucketSlot *pSlots; + SDiskbasedBuf *pBuffer; __perc_hash_func_t hashFunc; - SHashObj* groupPagesMap; // disk page map for different groups; + SHashObj *groupPagesMap; // disk page map for different groups; } tMemBucket; tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, double maxval); diff --git a/source/libs/function/inc/tscript.h b/source/libs/function/inc/tscript.h index 823e2d61f2..730d29a8c9 100644 --- a/source/libs/function/inc/tscript.h +++ b/source/libs/function/inc/tscript.h @@ -17,18 +17,18 @@ #define TDENGINE_QSCRIPT_H #if 0 -#include #include +#include #include -#include "tutil.h" #include "hash.h" #include "tlist.h" #include "tudf.h" +#include "tutil.h" #define MAX_FUNC_NAME 64 -#define USER_FUNC_NAME "funcName" +#define USER_FUNC_NAME "funcName" #define USER_FUNC_NAME_LIMIT 48 enum ScriptState { @@ -81,4 +81,4 @@ void scriptEnvPoolCleanup(); bool isValidScript(char *script, int32_t len); #endif -#endif //TDENGINE_QSCRIPT_H +#endif // TDENGINE_QSCRIPT_H diff --git a/source/libs/function/inc/tudfInt.h b/source/libs/function/inc/tudfInt.h index 6f82542aee..c1b0941d4b 100644 --- a/source/libs/function/inc/tudfInt.h +++ b/source/libs/function/inc/tudfInt.h @@ -40,33 +40,31 @@ typedef struct SUdfSetupRequest { typedef struct SUdfSetupResponse { int64_t udfHandle; - int8_t outputType; + int8_t outputType; int32_t outputLen; int32_t bufSize; } SUdfSetupResponse; typedef struct SUdfCallRequest { int64_t udfHandle; - int8_t callType; + int8_t callType; - SSDataBlock block; + SSDataBlock block; SUdfInterBuf interBuf; SUdfInterBuf interBuf2; - int8_t initFirst; + int8_t initFirst; } SUdfCallRequest; typedef struct SUdfCallResponse { - int8_t callType; - SSDataBlock resultData; + int8_t callType; + SSDataBlock resultData; SUdfInterBuf resultBuf; } SUdfCallResponse; - typedef struct SUdfTeardownRequest { int64_t udfHandle; } SUdfTeardownRequest; - typedef struct SUdfTeardownResponse { #ifdef WINDOWS size_t avoidCompilationErrors; @@ -79,8 +77,8 @@ typedef struct SUdfRequest { int8_t type; union { - SUdfSetupRequest setup; - SUdfCallRequest call; + SUdfSetupRequest setup; + SUdfCallRequest call; SUdfTeardownRequest teardown; }; } SUdfRequest; @@ -89,29 +87,29 @@ typedef struct SUdfResponse { int32_t msgLen; int64_t seqNum; - int8_t type; + int8_t type; int32_t code; union { - SUdfSetupResponse setupRsp; - SUdfCallResponse callRsp; + SUdfSetupResponse setupRsp; + SUdfCallResponse callRsp; SUdfTeardownResponse teardownRsp; }; } SUdfResponse; -int32_t encodeUdfRequest(void **buf, const SUdfRequest* request); -void* decodeUdfRequest(const void *buf, SUdfRequest* request); +int32_t encodeUdfRequest(void **buf, const SUdfRequest *request); +void *decodeUdfRequest(const void *buf, SUdfRequest *request); int32_t encodeUdfResponse(void **buf, const SUdfResponse *response); -void* decodeUdfResponse(const void* buf, SUdfResponse *response); +void *decodeUdfResponse(const void *buf, SUdfResponse *response); void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta); -void freeUdfColumn(SUdfColumn* col); +void freeUdfColumn(SUdfColumn *col); void freeUdfDataDataBlock(SUdfDataBlock *block); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); -int32_t getUdfdPipeName(char* pipeName, int32_t size); +int32_t getUdfdPipeName(char *pipeName, int32_t size); #ifdef __cplusplus } #endif diff --git a/source/libs/function/inc/udfc.h b/source/libs/function/inc/udfc.h index f414c2b29e..c46e500ccb 100644 --- a/source/libs/function/inc/udfc.h +++ b/source/libs/function/inc/udfc.h @@ -14,20 +14,20 @@ #endif enum { - UDF_TASK_SETUP = 0, - UDF_TASK_CALL = 1, - UDF_TASK_TEARDOWN = 2 + UDF_TASK_SETUP = 0, + UDF_TASK_CALL = 1, + UDF_TASK_TEARDOWN = 2 }; -typedef struct SSDataBlock{ - char *data; - int32_t size; +typedef struct SSDataBlock { + char *data; + int32_t size; } SSDataBlock; typedef struct SUdfInfo { - char *udfName; - char *path; + char *udfName; + char *path; } SUdfInfo; typedef void *UdfcFuncHandle; @@ -36,9 +36,9 @@ int32_t createUdfdProxy(); int32_t destroyUdfdProxy(); -//int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles); +// int32_t setupUdf(SUdfInfo *udf, int32_t numOfUdfs, UdfcFuncHandle *handles); -int32_t setupUdf(SUdfInfo* udf, UdfcFuncHandle* handle); +int32_t setupUdf(SUdfInfo *udf, UdfcFuncHandle *handle); int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSize, SSDataBlock input, char **newstate, int32_t *newStateSize, SSDataBlock *output); @@ -46,43 +46,39 @@ int32_t callUdf(UdfcFuncHandle handle, int8_t step, char *state, int32_t stateSi int32_t doTeardownUdf(UdfcFuncHandle handle); typedef struct SUdfSetupRequest { - char udfName[16]; // - int8_t scriptType; // 0:c, 1: lua, 2:js - int8_t udfType; //udaf, udf, udtf - int16_t pathSize; - char *path; + char udfName[16]; // + int8_t scriptType; // 0:c, 1: lua, 2:js + int8_t udfType; // udaf, udf, udtf + int16_t pathSize; + char *path; } SUdfSetupRequest; typedef struct SUdfSetupResponse { - int64_t udfHandle; + int64_t udfHandle; } SUdfSetupResponse; - typedef struct SUdfCallRequest { - int64_t udfHandle; - int8_t step; + int64_t udfHandle; + int8_t step; - int32_t inputBytes; - char *input; + int32_t inputBytes; + char *input; - int32_t stateBytes; - char *state; + int32_t stateBytes; + char *state; } SUdfCallRequest; - typedef struct SUdfCallResponse { - int32_t outputBytes; - char *output; - int32_t newStateBytes; - char *newState; + int32_t outputBytes; + char *output; + int32_t newStateBytes; + char *newState; } SUdfCallResponse; - typedef struct SUdfTeardownRequest { - int64_t udfHandle; + int64_t udfHandle; } SUdfTeardownRequest; - typedef struct SUdfTeardownResponse { #ifdef WINDOWS size_t avoidCompilationErrors; @@ -90,24 +86,24 @@ typedef struct SUdfTeardownResponse { } SUdfTeardownResponse; typedef struct SUdfRequest { - int32_t msgLen; - int64_t seqNum; + int32_t msgLen; + int64_t seqNum; - int8_t type; - void *subReq; + int8_t type; + void *subReq; } SUdfRequest; typedef struct SUdfResponse { - int32_t msgLen; - int64_t seqNum; + int32_t msgLen; + int64_t seqNum; - int8_t type; - int32_t code; - void *subRsp; + int8_t type; + int32_t code; + void *subRsp; } SUdfResponse; int32_t decodeRequest(char *buf, int32_t bufLen, SUdfRequest **pRequest); int32_t encodeResponse(char **buf, int32_t *bufLen, SUdfResponse *response); int32_t encodeRequest(char **buf, int32_t *bufLen, SUdfRequest *request); int32_t decodeResponse(char *buf, int32_t bufLen, SUdfResponse **pResponse); -#endif //UDF_UDF_H +#endif // UDF_UDF_H diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index d91423ab5c..7077a9b780 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1557,8 +1557,8 @@ int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { numOfElems += 1; } } else if (type == TSDB_DATA_TYPE_FLOAT) { - float* pData = (float*)pCol->pData; - float* val = (float*)&pBuf->v; + float* pData = (float*)pCol->pData; + float* val = (float*)&pBuf->v; for (int32_t i = start; i < start + numOfRows; ++i) { if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { @@ -2977,7 +2977,8 @@ int32_t lastFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } -static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, int32_t rowIndex) { +static void firstLastTransferInfo(SqlFunctionCtx* pCtx, SFirstLastRes* pInput, SFirstLastRes* pOutput, bool isFirst, + int32_t rowIndex) { SInputColumnInfoData* pColInfo = &pCtx->input; if (pOutput->hasResult) { diff --git a/source/libs/function/src/tfunctionInt.c b/source/libs/function/src/tfunctionInt.c index ff15b7714f..70378df0c3 100644 --- a/source/libs/function/src/tfunctionInt.c +++ b/source/libs/function/src/tfunctionInt.c @@ -15,8 +15,8 @@ #include "os.h" #include "taosdef.h" -#include "tmsg.h" #include "thash.h" +#include "tmsg.h" #include "ttypes.h" #include "function.h" @@ -29,15 +29,13 @@ #include "ttszip.h" #include "tudf.h" -void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) { - pCell->initialized = false; -} +void cleanupResultRowEntry(struct SResultRowEntryInfo* pCell) { pCell->initialized = false; } int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock) { int32_t maxRows = 0; for (int32_t j = 0; j < num; ++j) { - SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[j]); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[j]); if (pResInfo != NULL && maxRows < pResInfo->numOfRes) { maxRows = pResInfo->numOfRes; } @@ -46,12 +44,12 @@ int32_t getNumOfResult(SqlFunctionCtx* pCtx, int32_t num, SSDataBlock* pResBlock assert(maxRows >= 0); blockDataEnsureCapacity(pResBlock, maxRows); - for(int32_t i = 0; i < num; ++i) { + for (int32_t i = 0; i < num; ++i) { SColumnInfoData* pCol = taosArrayGet(pResBlock->pDataBlock, i); - SResultRowEntryInfo *pResInfo = GET_RES_INFO(&pCtx[i]); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(&pCtx[i]); if (pResInfo->numOfRes == 0) { - for(int32_t j = 0; j < pResInfo->numOfRes; ++j) { + for (int32_t j = 0; j < pResInfo->numOfRes; ++j) { colDataAppend(pCol, j, NULL, true); // TODO add set null data api } } else { @@ -70,6 +68,4 @@ bool isRowEntryCompleted(struct SResultRowEntryInfo* pEntry) { return pEntry->complete; } -bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) { - return pEntry->initialized; -} +bool isRowEntryInitialized(struct SResultRowEntryInfo* pEntry) { return pEntry->initialized; } diff --git a/source/libs/function/src/thistogram.c b/source/libs/function/src/thistogram.c index 2e60498aba..b0f23f78df 100644 --- a/source/libs/function/src/thistogram.c +++ b/source/libs/function/src/thistogram.c @@ -14,10 +14,10 @@ */ #include "os.h" -#include "thistogram.h" #include "taosdef.h" -#include "tmsg.h" +#include "thistogram.h" #include "tlosertree.h" +#include "tmsg.h" /** * @@ -54,7 +54,7 @@ SHistogramInfo* tHistogramCreateFrom(void* pBuf, int32_t numOfBins) { SHistogramInfo* pHisto = (SHistogramInfo*)pBuf; pHisto->elems = (SHistBin*)((char*)pBuf + sizeof(SHistogramInfo)); - for(int32_t i = 0; i < numOfBins; ++i) { + for (int32_t i = 0; i < numOfBins; ++i) { pHisto->elems[i].val = -DBL_MAX; } @@ -116,7 +116,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { pEntry1->delta = ((SHistBin*)pResNode->pForward[0]->pData)->val - val; if ((*pHisto)->ordered) { - int32_t lastIndex = (*pHisto)->maxIndex; + int32_t lastIndex = (*pHisto)->maxIndex; SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree; (*pHisto)->pLoserTree->pNode[lastIndex + pTree->numOfEntries].pData = pResNode; @@ -156,7 +156,7 @@ int32_t tHistogramAdd(SHistogramInfo** pHisto, double val) { SSkipListPrint((*pHisto)->pList, 1); SMultiwayMergeTreeInfo* pTree = (*pHisto)->pLoserTree; - tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; + tSkipListNode* pHead = (*pHisto)->pList->pHead.pForward[0]; tSkipListNode* p1 = pHead; @@ -357,7 +357,7 @@ void tHistogramDestroy(SHistogramInfo** pHisto) { } void tHistogramPrint(SHistogramInfo* pHisto) { - printf("total entries: %d, elements: %"PRId64 "\n", pHisto->numOfEntries, pHisto->numOfElems); + printf("total entries: %d, elements: %" PRId64 "\n", pHisto->numOfEntries, pHisto->numOfElems); #if defined(USE_ARRAYLIST) for (int32_t i = 0; i < pHisto->numOfEntries; ++i) { printf("%d: (%f, %" PRId64 ")\n", i + 1, pHisto->elems[i].val, pHisto->elems[i].num); @@ -536,7 +536,7 @@ SHistogramInfo* tHistogramMerge(SHistogramInfo* pHisto1, SHistogramInfo* pHisto2 } SHistBin* pHistoBins = taosMemoryCalloc(1, sizeof(SHistBin) * (pHisto1->numOfEntries + pHisto2->numOfEntries)); - int32_t i = 0, j = 0, k = 0; + int32_t i = 0, j = 0, k = 0; while (i < pHisto1->numOfEntries && j < pHisto2->numOfEntries) { if (pHisto1->elems[i].val < pHisto2->elems[j].val) { diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 4c58c0abe5..62c5e4b28b 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -14,8 +14,8 @@ */ #include "taoserror.h" -#include "tglobal.h" #include "tcompare.h" +#include "tglobal.h" #include "taosdef.h" #include "tcompare.h" @@ -25,21 +25,20 @@ #define DEFAULT_NUM_OF_SLOT 1024 -int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) { - return (times * numOfSlots) + slotIndex; -} +int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) { return (times * numOfSlots) + slotIndex; } static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) { - SFilePage *buffer = (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage)); + SFilePage *buffer = + (SFilePage *)taosMemoryCalloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(SFilePage)); int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times); - SArray* pIdList = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); + SArray *pIdList = *(SArray **)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); int32_t offset = 0; - for(int32_t i = 0; i < taosArrayGetSize(pIdList); ++i) { - int32_t* pageId = taosArrayGet(pIdList, i); + for (int32_t i = 0; i < taosArrayGetSize(pIdList); ++i) { + int32_t *pageId = taosArrayGet(pIdList, i); - SFilePage* pg = getBufPage(pMemBucket->pBuffer, *pageId); + SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); @@ -49,7 +48,7 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) return buffer; } -static void resetBoundingBox(MinMaxEntry* range, int32_t type) { +static void resetBoundingBox(MinMaxEntry *range, int32_t type) { if (IS_SIGNED_NUMERIC_TYPE(type)) { range->i64MaxVal = INT64_MIN; range->i64MinVal = INT64_MAX; @@ -62,17 +61,17 @@ static void resetBoundingBox(MinMaxEntry* range, int32_t type) { } } -static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, double maxval) { +static int32_t setBoundingBox(MinMaxEntry *range, int16_t type, double minval, double maxval) { if (minval > maxval) { return -1; } if (IS_SIGNED_NUMERIC_TYPE(type)) { - range->i64MinVal = (int64_t) minval; - range->i64MaxVal = (int64_t) maxval; - } else if (IS_UNSIGNED_NUMERIC_TYPE(type)){ - range->u64MinVal = (uint64_t) minval; - range->u64MaxVal = (uint64_t) maxval; + range->i64MinVal = (int64_t)minval; + range->i64MaxVal = (int64_t)maxval; + } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { + range->u64MinVal = (uint64_t)minval; + range->u64MaxVal = (uint64_t)maxval; } else { range->dMinVal = minval; range->dMaxVal = maxval; @@ -81,10 +80,10 @@ static int32_t setBoundingBox(MinMaxEntry* range, int16_t type, double minval, d return 0; } -static void resetPosInfo(SSlotInfo* pInfo) { - pInfo->size = 0; +static void resetPosInfo(SSlotInfo *pInfo) { + pInfo->size = 0; pInfo->pageId = -1; - pInfo->data = NULL; + pInfo->data = NULL; } double findOnlyResult(tMemBucket *pMemBucket) { @@ -92,16 +91,16 @@ double findOnlyResult(tMemBucket *pMemBucket) { for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { tMemBucketSlot *pSlot = &pMemBucket->pSlots[i]; - if (pSlot->info.size == 0) { + if (pSlot->info.size == 0) { continue; } int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); - SArray* list = *(SArray**)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); + SArray *list = *(SArray **)taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); assert(list->size == 1); - int32_t* pageId = taosArrayGet(list, 0); - SFilePage* pPage = getBufPage(pMemBucket->pBuffer, *pageId); + int32_t *pageId = taosArrayGet(list, 0); + SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); assert(pPage->num == 1); double v = 0; @@ -121,14 +120,14 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { if (v > pBucket->range.i64MaxVal || v < pBucket->range.i64MinVal) { return index; } - + // divide the value range into 1024 buckets uint64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal; if (span < pBucket->numOfSlots) { int64_t delta = v - pBucket->range.i64MinVal; index = (delta % pBucket->numOfSlots); } else { - double slotSpan = ((double)span) / pBucket->numOfSlots; + double slotSpan = ((double)span) / pBucket->numOfSlots; uint64_t delta = v - pBucket->range.i64MinVal; index = (int32_t)(delta / slotSpan); @@ -150,12 +149,12 @@ int32_t tBucketUintHash(tMemBucket *pBucket, const void *value) { if (v > pBucket->range.u64MaxVal || v < pBucket->range.u64MinVal) { return index; } - + // divide the value range into 1024 buckets uint64_t span = pBucket->range.u64MaxVal - pBucket->range.u64MinVal; if (span < pBucket->numOfSlots) { int64_t delta = v - pBucket->range.u64MinVal; - index = (int32_t) (delta % pBucket->numOfSlots); + index = (int32_t)(delta % pBucket->numOfSlots); } else { double slotSpan = (double)span / pBucket->numOfSlots; index = (int32_t)((v - pBucket->range.u64MinVal) / slotSpan); @@ -209,9 +208,9 @@ static __perc_hash_func_t getHashFunc(int32_t type) { } } -static void resetSlotInfo(tMemBucket* pBucket) { +static void resetSlotInfo(tMemBucket *pBucket) { for (int32_t i = 0; i < pBucket->numOfSlots; ++i) { - tMemBucketSlot* pSlot = &pBucket->pSlots[i]; + tMemBucketSlot *pSlot = &pBucket->pSlots[i]; resetBoundingBox(&pSlot->range, pBucket->type); resetPosInfo(&pSlot->info); @@ -225,9 +224,9 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, } pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; - pBucket->bufPageSize = 16384 * 4; // 16k per page + pBucket->bufPageSize = 16384 * 4; // 16k per page - pBucket->type = dataType; + pBucket->type = dataType; pBucket->bytes = nElemSize; pBucket->total = 0; pBucket->times = 1; @@ -235,17 +234,17 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, pBucket->maxCapacity = 200000; pBucket->groupPagesMap = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK); if (setBoundingBox(&pBucket->range, pBucket->type, minval, maxval) != 0) { -// qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); + // qError("MemBucket:%p, invalid value range: %f-%f", pBucket, minval, maxval); taosMemoryFree(pBucket); return NULL; } - pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(SFilePage))/pBucket->bytes; + pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(SFilePage)) / pBucket->bytes; pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC); pBucket->hashFunc = getHashFunc(pBucket->type); if (pBucket->hashFunc == NULL) { -// qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type); + // qError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type); taosMemoryFree(pBucket); return NULL; } @@ -270,8 +269,8 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, tMemBucketDestroy(pBucket); return NULL; } - -// qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes); + + // qDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes); return pBucket; } @@ -280,9 +279,9 @@ void tMemBucketDestroy(tMemBucket *pBucket) { return; } - void* p = taosHashIterate(pBucket->groupPagesMap, NULL); - while(p) { - SArray** p1 = p; + void *p = taosHashIterate(pBucket->groupPagesMap, NULL); + while (p) { + SArray **p1 = p; p = taosHashIterate(pBucket->groupPagesMap, p); taosArrayDestroy(*p1); } @@ -341,7 +340,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { int32_t count = 0; int32_t bytes = pBucket->bytes; for (int32_t i = 0; i < size; ++i) { - char *d = (char *) data + i * bytes; + char *d = (char *)data + i * bytes; int32_t index = (pBucket->hashFunc)(pBucket, d); if (index < 0) { continue; @@ -365,11 +364,11 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = NULL; } - SArray* pPageIdList = (SArray*)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId)); + SArray *pPageIdList = (SArray *)taosHashGet(pBucket->groupPagesMap, &groupId, sizeof(groupId)); if (pPageIdList == NULL) { - SArray* pList = taosArrayInit(4, sizeof(int32_t)); + SArray *pList = taosArrayInit(4, sizeof(int32_t)); taosHashPut(pBucket->groupPagesMap, &groupId, sizeof(groupId), &pList, POINTER_BYTES); - pPageIdList = pList; + pPageIdList = pList; } pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); @@ -396,29 +395,29 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { * slot of the next segment. */ static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t slotIdx) { - int32_t j = slotIdx + 1; - while (j < pMemBucket->numOfSlots && (pMemBucket->pSlots[j].info.size == 0)) { - ++j; - } + int32_t j = slotIdx + 1; + while (j < pMemBucket->numOfSlots && (pMemBucket->pSlots[j].info.size == 0)) { + ++j; + } - assert(j < pMemBucket->numOfSlots); - return pMemBucket->pSlots[j].range; + assert(j < pMemBucket->numOfSlots); + return pMemBucket->pSlots[j].range; } static bool isIdenticalData(tMemBucket *pMemBucket, int32_t index); -static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) { +static double getIdenticalDataVal(tMemBucket *pMemBucket, int32_t slotIndex) { assert(isIdenticalData(pMemBucket, slotIndex)); tMemBucketSlot *pSlot = &pMemBucket->pSlots[slotIndex]; double finalResult = 0.0; if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) { - finalResult = (double) pSlot->range.i64MinVal; + finalResult = (double)pSlot->range.i64MinVal; } else if (IS_UNSIGNED_NUMERIC_TYPE(pMemBucket->type)) { - finalResult = (double) pSlot->range.u64MinVal; + finalResult = (double)pSlot->range.u64MinVal; } else { - finalResult = (double) pSlot->range.dMinVal; + finalResult = (double)pSlot->range.dMinVal; } return finalResult; @@ -445,14 +444,14 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) double maxOfThisSlot = 0; double minOfNextSlot = 0; if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) { - maxOfThisSlot = (double) pSlot->range.i64MaxVal; - minOfNextSlot = (double) next.i64MinVal; + maxOfThisSlot = (double)pSlot->range.i64MaxVal; + minOfNextSlot = (double)next.i64MinVal; } else if (IS_UNSIGNED_NUMERIC_TYPE(pMemBucket->type)) { - maxOfThisSlot = (double) pSlot->range.u64MaxVal; - minOfNextSlot = (double) next.u64MinVal; + maxOfThisSlot = (double)pSlot->range.u64MaxVal; + minOfNextSlot = (double)next.u64MinVal; } else { - maxOfThisSlot = (double) pSlot->range.dMaxVal; - minOfNextSlot = (double) next.dMinVal; + maxOfThisSlot = (double)pSlot->range.dMaxVal; + minOfNextSlot = (double)next.dMinVal; } assert(minOfNextSlot > maxOfThisSlot); @@ -478,32 +477,32 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) return val; } else { // incur a second round bucket split - if (isIdenticalData(pMemBucket, i)) { - return getIdenticalDataVal(pMemBucket, i); - } + if (isIdenticalData(pMemBucket, i)) { + return getIdenticalDataVal(pMemBucket, i); + } - // try next round - pMemBucket->times += 1; -// qDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times); + // try next round + pMemBucket->times += 1; + // qDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times); - pMemBucket->range = pSlot->range; - pMemBucket->total = 0; + pMemBucket->range = pSlot->range; + pMemBucket->total = 0; - resetSlotInfo(pMemBucket); + resetSlotInfo(pMemBucket); - int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); + int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); SIDList list = taosHashGet(pMemBucket->groupPagesMap, &groupId, sizeof(groupId)); - assert(list->size > 0); + assert(list->size > 0); - for (int32_t f = 0; f < list->size; ++f) { - SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); - SFilePage *pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); + for (int32_t f = 0; f < list->size; ++f) { + SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); + SFilePage *pg = getBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); - tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); - releaseBufPageInfo(pMemBucket->pBuffer, pgInfo); - } + tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); + releaseBufPageInfo(pMemBucket->pBuffer, pgInfo); + } - return getPercentileImpl(pMemBucket, count - num, fraction); + return getPercentileImpl(pMemBucket, count - num, fraction); } } else { num += pSlot->info.size; @@ -527,7 +526,7 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { // find the min/max value, no need to scan all data in bucket if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { - MinMaxEntry* pRange = &pMemBucket->range; + MinMaxEntry *pRange = &pMemBucket->range; if (IS_SIGNED_NUMERIC_TYPE(pMemBucket->type)) { double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->i64MaxVal : pRange->i64MinVal); @@ -536,11 +535,11 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { double v = (double)(fabs(percent - 100) < DBL_EPSILON ? pRange->u64MaxVal : pRange->u64MinVal); return v; } else { - return fabs(percent - 100) < DBL_EPSILON? pRange->dMaxVal:pRange->dMinVal; + return fabs(percent - 100) < DBL_EPSILON ? pRange->dMaxVal : pRange->dMinVal; } } - double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0); + double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0); // do put data by using buckets int32_t orderIdx = (int32_t)percentVal; diff --git a/source/libs/function/src/tscript.c b/source/libs/function/src/tscript.c index 93e8ecf5cf..768581285b 100644 --- a/source/libs/function/src/tscript.c +++ b/source/libs/function/src/tscript.c @@ -13,10 +13,10 @@ * along with this program. If not, see . */ -#include "os.h" #include "tscript.h" -#include "ttypes.h" +#include "os.h" #include "tstrbuild.h" +#include "ttypes.h" //#include "queryLog.h" #include "ttokendef.h" #if 0 diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index 060a92f864..1483394509 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -13,33 +13,35 @@ * along with this program. If not, see . */ #include "uv.h" + #include "os.h" + +#include "builtinsimpl.h" #include "fnLog.h" -#include "tudf.h" -#include "tudfInt.h" +#include "functionMgt.h" +#include "querynodes.h" #include "tarray.h" -#include "tglobal.h" #include "tdatablock.h" -#include "querynodes.h" -#include "builtinsimpl.h" -#include "functionMgt.h" +#include "tglobal.h" +#include "tudf.h" +#include "tudfInt.h" typedef struct SUdfdData { - bool startCalled; - bool needCleanUp; - uv_loop_t loop; - uv_thread_t thread; - uv_barrier_t barrier; - uv_process_t process; + bool startCalled; + bool needCleanUp; + uv_loop_t loop; + uv_thread_t thread; + uv_barrier_t barrier; + uv_process_t process; #ifdef WINDOWS - HANDLE jobHandle; + HANDLE jobHandle; #endif - int spawnErr; - uv_pipe_t ctrlPipe; - uv_async_t stopAsync; - int32_t stopCalled; + int spawnErr; + uv_pipe_t ctrlPipe; + uv_async_t stopAsync; + int32_t stopCalled; - int32_t dnodeId; + int32_t dnodeId; } SUdfdData; SUdfdData udfdGlobal = {0}; @@ -48,11 +50,11 @@ int32_t udfStartUdfd(int32_t startDnodeId); int32_t udfStopUdfd(); static int32_t udfSpawnUdfd(SUdfdData *pData); -void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal); -static int32_t udfSpawnUdfd(SUdfdData* pData); -static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg); -static void udfUdfdStopAsyncCb(uv_async_t *async); -static void udfWatchUdfd(void *args); +void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal); +static int32_t udfSpawnUdfd(SUdfdData *pData); +static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg); +static void udfUdfdStopAsyncCb(uv_async_t *async); +static void udfWatchUdfd(void *args); void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { fnInfo("udfd process exited with status %" PRId64 ", signal %d", exitStatus, termSignal); @@ -65,27 +67,27 @@ void udfUdfdExit(uv_process_t *process, int64_t exitStatus, int termSignal) { } } -static int32_t udfSpawnUdfd(SUdfdData* pData) { +static int32_t udfSpawnUdfd(SUdfdData *pData) { fnInfo("start to init udfd"); uv_process_options_t options = {0}; char path[PATH_MAX] = {0}; if (tsProcPath == NULL) { path[0] = '.'; - #ifdef WINDOWS +#ifdef WINDOWS GetModuleFileName(NULL, path, PATH_MAX); taosDirName(path); - #elif defined(_TD_DARWIN_64) +#elif defined(_TD_DARWIN_64) uint32_t pathSize = sizeof(path); _NSGetExecutablePath(path, &pathSize); taosDirName(path); - #endif +#endif } else { strncpy(path, tsProcPath, PATH_MAX); taosDirName(path); } #ifdef WINDOWS - if (strlen(path)==0) { + if (strlen(path) == 0) { strcat(path, "udfd.exe"); } else { strcat(path, "\\udfd.exe"); @@ -93,7 +95,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { #else strcat(path, "/udfd"); #endif - char* argsUdfd[] = {path, "-c", configDir, NULL}; + char *argsUdfd[] = {path, "-c", configDir, NULL}; options.args = argsUdfd; options.file = path; @@ -103,7 +105,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { uv_stdio_container_t child_stdio[3]; child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE; - child_stdio[0].data.stream = (uv_stream_t*) &pData->ctrlPipe; + child_stdio[0].data.stream = (uv_stream_t *)&pData->ctrlPipe; child_stdio[1].flags = UV_IGNORE; child_stdio[2].flags = UV_INHERIT_FD; child_stdio[2].data.fd = 2; @@ -117,12 +119,12 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { snprintf(dnodeIdEnvItem, 32, "%s=%d", "DNODE_ID", pData->dnodeId); float numCpuCores = 4; taosGetCpuCores(&numCpuCores); - snprintf(thrdPoolSizeEnvItem,32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores*2); - char* envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; + snprintf(thrdPoolSizeEnvItem, 32, "%s=%d", "UV_THREADPOOL_SIZE", (int)numCpuCores * 2); + char *envUdfd[] = {dnodeIdEnvItem, thrdPoolSizeEnvItem, NULL}; options.env = envUdfd; int err = uv_spawn(&pData->loop, &pData->process, &options); - pData->process.data = (void*)pData; + pData->process.data = (void *)pData; #ifdef WINDOWS // End udfd.exe by Job. @@ -135,7 +137,8 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { JOBOBJECT_EXTENDED_LIMIT_INFORMATION limit_info; memset(&limit_info, 0x0, sizeof(limit_info)); limit_info.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE; - bool set_auto_kill_ok = SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info)); + bool set_auto_kill_ok = + SetInformationJobObject(pData->jobHandle, JobObjectExtendedLimitInformation, &limit_info, sizeof(limit_info)); if (!set_auto_kill_ok) { fnError("Set job auto kill udfd failed."); } @@ -150,7 +153,7 @@ static int32_t udfSpawnUdfd(SUdfdData* pData) { return err; } -static void udfUdfdCloseWalkCb(uv_handle_t* handle, void* arg) { +static void udfUdfdCloseWalkCb(uv_handle_t *handle, void *arg) { if (!uv_is_closing(handle)) { uv_close(handle, NULL); } @@ -180,8 +183,7 @@ static void udfWatchUdfd(void *args) { int32_t udfStartUdfd(int32_t startDnodeId) { if (!tsStartUdfd) { - fnInfo("start udfd is disabled.") - return 0; + fnInfo("start udfd is disabled.") return 0; } SUdfdData *pData = &udfdGlobal; if (pData->startCalled) { @@ -212,8 +214,7 @@ int32_t udfStartUdfd(int32_t startDnodeId) { int32_t udfStopUdfd() { SUdfdData *pData = &udfdGlobal; - fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", - pData->needCleanUp, pData->spawnErr); + fnInfo("udfd start to stop, need cleanup:%d, spawn err:%d", pData->needCleanUp, pData->spawnErr); if (!pData->needCleanUp || atomic_load_32(&pData->stopCalled)) { return 0; } @@ -237,107 +238,91 @@ int32_t udfStopUdfd() { typedef void *QUEUE[2]; /* Private macros. */ -#define QUEUE_NEXT(q) (*(QUEUE **) &((*(q))[0])) -#define QUEUE_PREV(q) (*(QUEUE **) &((*(q))[1])) -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) +#define QUEUE_NEXT(q) (*(QUEUE **)&((*(q))[0])) +#define QUEUE_PREV(q) (*(QUEUE **)&((*(q))[1])) +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) /* Public macros. */ -#define QUEUE_DATA(ptr, type, field) \ - ((type *) ((char *) (ptr) - offsetof(type, field))) +#define QUEUE_DATA(ptr, type, field) ((type *)((char *)(ptr)-offsetof(type, field))) /* Important note: mutating the list while QUEUE_FOREACH is * iterating over its elements results in undefined behavior. */ -#define QUEUE_FOREACH(q, h) \ - for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) - -#define QUEUE_EMPTY(q) \ - ((const QUEUE *) (q) == (const QUEUE *) QUEUE_NEXT(q)) - -#define QUEUE_HEAD(q) \ - (QUEUE_NEXT(q)) - -#define QUEUE_INIT(q) \ - do { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } \ - while (0) - -#define QUEUE_ADD(h, n) \ - do { \ - QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ - QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV(h) = QUEUE_PREV(n); \ - QUEUE_PREV_NEXT(h) = (h); \ - } \ - while (0) - -#define QUEUE_SPLIT(h, q, n) \ - do { \ - QUEUE_PREV(n) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(n) = (n); \ - QUEUE_NEXT(n) = (q); \ - QUEUE_PREV(h) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(h) = (h); \ - QUEUE_PREV(q) = (n); \ - } \ - while (0) - -#define QUEUE_MOVE(h, n) \ - do { \ - if (QUEUE_EMPTY(h)) \ - QUEUE_INIT(n); \ - else { \ - QUEUE* q = QUEUE_HEAD(h); \ - QUEUE_SPLIT(h, q, n); \ - } \ - } \ - while (0) - -#define QUEUE_INSERT_HEAD(h, q) \ - do { \ - QUEUE_NEXT(q) = QUEUE_NEXT(h); \ - QUEUE_PREV(q) = (h); \ - QUEUE_NEXT_PREV(q) = (q); \ - QUEUE_NEXT(h) = (q); \ - } \ - while (0) - -#define QUEUE_INSERT_TAIL(h, q) \ - do { \ - QUEUE_NEXT(q) = (h); \ - QUEUE_PREV(q) = QUEUE_PREV(h); \ - QUEUE_PREV_NEXT(q) = (q); \ - QUEUE_PREV(h) = (q); \ - } \ - while (0) - -#define QUEUE_REMOVE(q) \ - do { \ - QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ - QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ - } \ - while (0) - - -enum { - UV_TASK_CONNECT = 0, - UV_TASK_REQ_RSP = 1, - UV_TASK_DISCONNECT = 2 -}; +#define QUEUE_FOREACH(q, h) for ((q) = QUEUE_NEXT(h); (q) != (h); (q) = QUEUE_NEXT(q)) + +#define QUEUE_EMPTY(q) ((const QUEUE *)(q) == (const QUEUE *)QUEUE_NEXT(q)) + +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +#define QUEUE_INIT(q) \ + do { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } while (0) + +#define QUEUE_ADD(h, n) \ + do { \ + QUEUE_PREV_NEXT(h) = QUEUE_NEXT(n); \ + QUEUE_NEXT_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV(h) = QUEUE_PREV(n); \ + QUEUE_PREV_NEXT(h) = (h); \ + } while (0) + +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } while (0) + +#define QUEUE_MOVE(h, n) \ + do { \ + if (QUEUE_EMPTY(h)) \ + QUEUE_INIT(n); \ + else { \ + QUEUE *q = QUEUE_HEAD(h); \ + QUEUE_SPLIT(h, q, n); \ + } \ + } while (0) + +#define QUEUE_INSERT_HEAD(h, q) \ + do { \ + QUEUE_NEXT(q) = QUEUE_NEXT(h); \ + QUEUE_PREV(q) = (h); \ + QUEUE_NEXT_PREV(q) = (q); \ + QUEUE_NEXT(h) = (q); \ + } while (0) + +#define QUEUE_INSERT_TAIL(h, q) \ + do { \ + QUEUE_NEXT(q) = (h); \ + QUEUE_PREV(q) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(q) = (q); \ + QUEUE_PREV(h) = (q); \ + } while (0) + +#define QUEUE_REMOVE(q) \ + do { \ + QUEUE_PREV_NEXT(q) = QUEUE_NEXT(q); \ + QUEUE_NEXT_PREV(q) = QUEUE_PREV(q); \ + } while (0) + +enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 }; int64_t gUdfTaskSeqNum = 0; typedef struct SUdfcFuncStub { - char udfName[TSDB_FUNC_NAME_LEN]; + char udfName[TSDB_FUNC_NAME_LEN]; UdfcFuncHandle handle; - int32_t refCount; - int64_t lastRefTime; + int32_t refCount; + int64_t lastRefTime; } SUdfcFuncStub; typedef struct SUdfcProxy { - char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; + char udfdPipeName[PATH_MAX + UDF_LISTEN_PIPE_NAME_LEN + 2]; uv_barrier_t initBarrier; uv_loop_t uvLoop; @@ -352,7 +337,7 @@ typedef struct SUdfcProxy { QUEUE uvProcTaskQueue; uv_mutex_t udfStubsMutex; - SArray* udfStubs; // SUdfcFuncStub + SArray *udfStubs; // SUdfcFuncStub int8_t initialized; } SUdfcProxy; @@ -361,8 +346,8 @@ SUdfcProxy gUdfdProxy = {0}; typedef struct SUdfcUvSession { SUdfcProxy *udfc; - int64_t severHandle; - uv_pipe_t *udfUvPipe; + int64_t severHandle; + uv_pipe_t *udfUvPipe; int8_t outputType; int32_t outputLen; @@ -373,12 +358,12 @@ typedef struct SUdfcUvSession { typedef struct SClientUvTaskNode { SUdfcProxy *udfc; - int8_t type; - int errCode; + int8_t type; + int errCode; uv_pipe_t *pipe; - int64_t seqNum; + int64_t seqNum; uv_buf_t reqBuf; uv_sem_t taskSem; @@ -398,15 +383,15 @@ typedef struct SClientUdfTask { union { struct { - SUdfSetupRequest req; + SUdfSetupRequest req; SUdfSetupResponse rsp; } _setup; struct { - SUdfCallRequest req; + SUdfCallRequest req; SUdfCallResponse rsp; } _call; struct { - SUdfTeardownRequest req; + SUdfTeardownRequest req; SUdfTeardownResponse rsp; } _teardown; }; @@ -414,55 +399,55 @@ typedef struct SClientUdfTask { } SClientUdfTask; typedef struct SClientConnBuf { - char *buf; + char *buf; int32_t len; int32_t cap; int32_t total; } SClientConnBuf; typedef struct SClientUvConn { - uv_pipe_t *pipe; - QUEUE taskQueue; - SClientConnBuf readBuf; + uv_pipe_t *pipe; + QUEUE taskQueue; + SClientConnBuf readBuf; SUdfcUvSession *session; } SClientUvConn; enum { - UDFC_STATE_INITAL = 0, // initial state - UDFC_STATE_STARTNG, // starting after udfcOpen - UDFC_STATE_READY, // started and begin to receive quests - UDFC_STATE_STOPPING, // stopping after udfcClose + UDFC_STATE_INITAL = 0, // initial state + UDFC_STATE_STARTNG, // starting after udfcOpen + UDFC_STATE_READY, // started and begin to receive quests + UDFC_STATE_STOPPING, // stopping after udfcClose }; -int32_t getUdfdPipeName(char* pipeName, int32_t size); +int32_t getUdfdPipeName(char *pipeName, int32_t size); int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup); -void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request); -int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state); -void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state); +void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request); +int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state); +void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state); int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call); -void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call); +void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call); int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown); -void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown); -int32_t encodeUdfRequest(void** buf, const SUdfRequest* request); -void* decodeUdfRequest(const void* buf, SUdfRequest* request); +void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown); +int32_t encodeUdfRequest(void **buf, const SUdfRequest *request); +void *decodeUdfRequest(const void *buf, SUdfRequest *request); int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp); -void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp); +void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp); int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp); -void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp); -int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp); -void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse); -int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp); -void* decodeUdfResponse(const void* buf, SUdfResponse* rsp); -void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta); -void freeUdfColumn(SUdfColumn* col); -void freeUdfDataDataBlock(SUdfDataBlock *block); -void freeUdfInterBuf(SUdfInterBuf *buf); +void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp); +int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp); +void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse); +int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp); +void *decodeUdfResponse(const void *buf, SUdfResponse *rsp); +void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta); +void freeUdfColumn(SUdfColumn *col); +void freeUdfDataDataBlock(SUdfDataBlock *block); +void freeUdfInterBuf(SUdfInterBuf *buf); int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock); int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block); int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output); int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output); -int32_t getUdfdPipeName(char* pipeName, int32_t size) { +int32_t getUdfdPipeName(char *pipeName, int32_t size) { char dnodeId[8] = {0}; size_t dnodeIdSize = sizeof(dnodeId); int32_t err = uv_os_getenv(UDF_DNODE_ID_ENV_NAME, dnodeId, &dnodeIdSize); @@ -471,7 +456,8 @@ int32_t getUdfdPipeName(char* pipeName, int32_t size) { dnodeId[0] = '1'; } #ifdef _WIN32 - snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX,MurmurHash3_32(tsDataDir, strlen(tsDataDir)), dnodeId); + snprintf(pipeName, size, "%s.%x.%s", UDF_LISTEN_PIPE_NAME_PREFIX, MurmurHash3_32(tsDataDir, strlen(tsDataDir)), + dnodeId); #else snprintf(pipeName, size, "%s/%s%s", tsDataDir, UDF_LISTEN_PIPE_NAME_PREFIX, dnodeId); #endif @@ -485,12 +471,12 @@ int32_t encodeUdfSetupRequest(void **buf, const SUdfSetupRequest *setup) { return len; } -void* decodeUdfSetupRequest(const void* buf, SUdfSetupRequest *request) { +void *decodeUdfSetupRequest(const void *buf, SUdfSetupRequest *request) { buf = taosDecodeBinaryTo(buf, request->udfName, TSDB_FUNC_NAME_LEN); - return (void*)buf; + return (void *)buf; } -int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) { +int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf *state) { int32_t len = 0; len += taosEncodeFixedI8(buf, state->numOfResult); len += taosEncodeFixedI32(buf, state->bufLen); @@ -498,11 +484,11 @@ int32_t encodeUdfInterBuf(void **buf, const SUdfInterBuf* state) { return len; } -void* decodeUdfInterBuf(const void* buf, SUdfInterBuf* state) { +void *decodeUdfInterBuf(const void *buf, SUdfInterBuf *state) { buf = taosDecodeFixedI8(buf, &state->numOfResult); buf = taosDecodeFixedI32(buf, &state->bufLen); - buf = taosDecodeBinary(buf, (void**)&state->buf, state->bufLen); - return (void*)buf; + buf = taosDecodeBinary(buf, (void **)&state->buf, state->bufLen); + return (void *)buf; } int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) { @@ -525,7 +511,7 @@ int32_t encodeUdfCallRequest(void **buf, const SUdfCallRequest *call) { return len; } -void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) { +void *decodeUdfCallRequest(const void *buf, SUdfCallRequest *call) { buf = taosDecodeFixedI64(buf, &call->udfHandle); buf = taosDecodeFixedI8(buf, &call->callType); switch (call->callType) { @@ -547,7 +533,7 @@ void* decodeUdfCallRequest(const void* buf, SUdfCallRequest* call) { buf = decodeUdfInterBuf(buf, &call->interBuf); break; } - return (void*)buf; + return (void *)buf; } int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown) { @@ -556,17 +542,17 @@ int32_t encodeUdfTeardownRequest(void **buf, const SUdfTeardownRequest *teardown return len; } -void* decodeUdfTeardownRequest(const void* buf, SUdfTeardownRequest *teardown) { +void *decodeUdfTeardownRequest(const void *buf, SUdfTeardownRequest *teardown) { buf = taosDecodeFixedI64(buf, &teardown->udfHandle); - return (void*)buf; + return (void *)buf; } -int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) { +int32_t encodeUdfRequest(void **buf, const SUdfRequest *request) { int32_t len = 0; if (buf == NULL) { len += sizeof(request->msgLen); } else { - *(int32_t*)(*buf) = request->msgLen; + *(int32_t *)(*buf) = request->msgLen; *buf = POINTER_SHIFT(*buf, sizeof(request->msgLen)); } len += taosEncodeFixedI64(buf, request->seqNum); @@ -581,8 +567,8 @@ int32_t encodeUdfRequest(void** buf, const SUdfRequest* request) { return len; } -void* decodeUdfRequest(const void* buf, SUdfRequest* request) { - request->msgLen = *(int32_t*)(buf); +void *decodeUdfRequest(const void *buf, SUdfRequest *request) { + request->msgLen = *(int32_t *)(buf); buf = POINTER_SHIFT(buf, sizeof(request->msgLen)); buf = taosDecodeFixedI64(buf, &request->seqNum); @@ -595,7 +581,7 @@ void* decodeUdfRequest(const void* buf, SUdfRequest* request) { } else if (request->type == UDF_TASK_TEARDOWN) { buf = decodeUdfTeardownRequest(buf, &request->teardown); } - return (void*)buf; + return (void *)buf; } int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { @@ -607,12 +593,12 @@ int32_t encodeUdfSetupResponse(void **buf, const SUdfSetupResponse *setupRsp) { return len; } -void* decodeUdfSetupResponse(const void* buf, SUdfSetupResponse* setupRsp) { +void *decodeUdfSetupResponse(const void *buf, SUdfSetupResponse *setupRsp) { buf = taosDecodeFixedI64(buf, &setupRsp->udfHandle); buf = taosDecodeFixedI8(buf, &setupRsp->outputType); buf = taosDecodeFixedI32(buf, &setupRsp->outputLen); buf = taosDecodeFixedI32(buf, &setupRsp->bufSize); - return (void*)buf; + return (void *)buf; } int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) { @@ -638,7 +624,7 @@ int32_t encodeUdfCallResponse(void **buf, const SUdfCallResponse *callRsp) { return len; } -void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) { +void *decodeUdfCallResponse(const void *buf, SUdfCallResponse *callRsp) { buf = taosDecodeFixedI8(buf, &callRsp->callType); switch (callRsp->callType) { case TSDB_UDF_CALL_SCALA_PROC: @@ -657,30 +643,26 @@ void* decodeUdfCallResponse(const void* buf, SUdfCallResponse* callRsp) { buf = decodeUdfInterBuf(buf, &callRsp->resultBuf); break; } - return (void*)buf; + return (void *)buf; } -int32_t encodeUdfTeardownResponse(void** buf, const SUdfTeardownResponse* teardownRsp) { - return 0; -} +int32_t encodeUdfTeardownResponse(void **buf, const SUdfTeardownResponse *teardownRsp) { return 0; } -void* decodeUdfTeardownResponse(const void* buf, SUdfTeardownResponse* teardownResponse) { - return (void*)buf; -} +void *decodeUdfTeardownResponse(const void *buf, SUdfTeardownResponse *teardownResponse) { return (void *)buf; } -int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) { +int32_t encodeUdfResponse(void **buf, const SUdfResponse *rsp) { int32_t len = 0; if (buf == NULL) { len += sizeof(rsp->msgLen); } else { - *(int32_t*)(*buf) = rsp->msgLen; + *(int32_t *)(*buf) = rsp->msgLen; *buf = POINTER_SHIFT(*buf, sizeof(rsp->msgLen)); } if (buf == NULL) { len += sizeof(rsp->seqNum); } else { - *(int64_t*)(*buf) = rsp->seqNum; + *(int64_t *)(*buf) = rsp->seqNum; *buf = POINTER_SHIFT(*buf, sizeof(rsp->seqNum)); } @@ -705,10 +687,10 @@ int32_t encodeUdfResponse(void** buf, const SUdfResponse* rsp) { return len; } -void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { - rsp->msgLen = *(int32_t*)(buf); +void *decodeUdfResponse(const void *buf, SUdfResponse *rsp) { + rsp->msgLen = *(int32_t *)(buf); buf = POINTER_SHIFT(buf, sizeof(rsp->msgLen)); - rsp->seqNum = *(int64_t*)(buf); + rsp->seqNum = *(int64_t *)(buf); buf = POINTER_SHIFT(buf, sizeof(rsp->seqNum)); buf = taosDecodeFixedI64(buf, &rsp->seqNum); buf = taosDecodeFixedI8(buf, &rsp->type); @@ -728,7 +710,7 @@ void* decodeUdfResponse(const void* buf, SUdfResponse* rsp) { fnError("decode udf response, invalid udf response type %d", rsp->type); break; } - return (void*)buf; + return (void *)buf; } void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) { @@ -745,9 +727,7 @@ void freeUdfColumnData(SUdfColumnData *data, SUdfColumnMeta *meta) { } } -void freeUdfColumn(SUdfColumn* col) { - freeUdfColumnData(&col->colData, &col->colMeta); -} +void freeUdfColumn(SUdfColumn *col) { freeUdfColumnData(&col->colData, &col->colMeta); } void freeUdfDataDataBlock(SUdfDataBlock *block) { for (int32_t i = 0; i < block->numOfCols; ++i) { @@ -764,15 +744,14 @@ void freeUdfInterBuf(SUdfInterBuf *buf) { buf->buf = NULL; } - int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlock) { udfBlock->numOfRows = block->info.rows; udfBlock->numOfCols = taosArrayGetSize(block->pDataBlock); - udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn*)); + udfBlock->udfCols = taosMemoryCalloc(taosArrayGetSize(block->pDataBlock), sizeof(SUdfColumn *)); for (int32_t i = 0; i < udfBlock->numOfCols; ++i) { udfBlock->udfCols[i] = taosMemoryCalloc(1, sizeof(SUdfColumn)); - SColumnInfoData *col= (SColumnInfoData*)taosArrayGet(block->pDataBlock, i); - SUdfColumn *udfCol = udfBlock->udfCols[i]; + SColumnInfoData *col = (SColumnInfoData *)taosArrayGet(block->pDataBlock, i); + SUdfColumn *udfCol = udfBlock->udfCols[i]; udfCol->colMeta.type = col->info.type; udfCol->colMeta.bytes = col->info.bytes; udfCol->colMeta.scale = col->info.scale; @@ -790,12 +769,12 @@ int32_t convertDataBlockToUdfDataBlock(SSDataBlock *block, SUdfDataBlock *udfBlo udfCol->colData.fixLenCol.nullBitmapLen = BitmapLen(udfCol->colData.numOfRows); int32_t bitmapLen = udfCol->colData.fixLenCol.nullBitmapLen; udfCol->colData.fixLenCol.nullBitmap = taosMemoryMalloc(udfCol->colData.fixLenCol.nullBitmapLen); - char* bitmap = udfCol->colData.fixLenCol.nullBitmap; + char *bitmap = udfCol->colData.fixLenCol.nullBitmap; memcpy(bitmap, col->nullbitmap, bitmapLen); udfCol->colData.fixLenCol.dataLen = colDataGetLength(col, udfBlock->numOfRows); int32_t dataLen = udfCol->colData.fixLenCol.dataLen; udfCol->colData.fixLenCol.data = taosMemoryMalloc(udfCol->colData.fixLenCol.dataLen); - char* data = udfCol->colData.fixLenCol.data; + char *data = udfCol->colData.fixLenCol.data; memcpy(data, col->pData, dataLen); } } @@ -809,7 +788,7 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { block->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData)); taosArraySetSize(block->pDataBlock, 1); SColumnInfoData *col = taosArrayGet(block->pDataBlock, 0); - SUdfColumnMeta *meta = &udfCol->colMeta; + SUdfColumnMeta *meta = &udfCol->colMeta; col->info.precision = meta->precision; col->info.bytes = meta->bytes; col->info.scale = meta->scale; @@ -837,7 +816,7 @@ int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SS for (int32_t i = 0; i < numOfCols; ++i) { taosArrayPush(output->pDataBlock, (input + i)->columnData); - if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) { + if (IS_VAR_DATA_TYPE((input + i)->columnData->info.type)) { output->info.hasVarCol = true; } } @@ -852,71 +831,70 @@ int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { output->numOfRows = input->info.rows; output->columnData = taosMemoryMalloc(sizeof(SColumnInfoData)); - memcpy(output->columnData, - taosArrayGet(input->pDataBlock, 0), - sizeof(SColumnInfoData)); - output->colAlloced = true; + memcpy(output->columnData, taosArrayGet(input->pDataBlock, 0), sizeof(SColumnInfoData)); + output->colAlloced = true; return 0; } ////////////////////////////////////////////////////////////////////////////////////////////////////////////// -//memory layout |---SUdfAggRes----|-----final result-----|---inter result----| +// memory layout |---SUdfAggRes----|-----final result-----|---inter result----| typedef struct SUdfAggRes { int8_t finalResNum; int8_t interResNum; - char* finalResBuf; - char* interResBuf; + char *finalResBuf; + char *interResBuf; } SUdfAggRes; -void onUdfcPipeClose(uv_handle_t *handle); +void onUdfcPipeClose(uv_handle_t *handle); int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask); -void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf); -bool isUdfcUvMsgComplete(SClientConnBuf *connBuf); -void udfcUvHandleRsp(SClientUvConn *conn); -void udfcUvHandleError(SClientUvConn *conn); -void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); -void onUdfcPipeWrite(uv_write_t *write, int status); -void onUdfcPipeConnect(uv_connect_t *connect, int status); +void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf); +bool isUdfcUvMsgComplete(SClientConnBuf *connBuf); +void udfcUvHandleRsp(SClientUvConn *conn); +void udfcUvHandleError(SClientUvConn *conn); +void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf); +void onUdfcPipeWrite(uv_write_t *write, int status); +void onUdfcPipeConnect(uv_connect_t *connect, int status); int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvTaskNode *uvTask); int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask); int32_t udfcStartUvTask(SClientUvTaskNode *uvTask); -void udfcAsyncTaskCb(uv_async_t *async); -void cleanUpUvTasks(SUdfcProxy *udfc); -void udfStopAsyncCb(uv_async_t *async); -void constructUdfService(void *argsThread); +void udfcAsyncTaskCb(uv_async_t *async); +void cleanUpUvTasks(SUdfcProxy *udfc); +void udfStopAsyncCb(uv_async_t *async); +void constructUdfService(void *argsThread); int32_t udfcRunUdfUvTask(SClientUdfTask *task, int8_t uvTaskType); int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle); -int compareUdfcFuncSub(const void* elem1, const void* elem2); +int compareUdfcFuncSub(const void *elem1, const void *elem2); int32_t doTeardownUdf(UdfcFuncHandle handle); int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, - SSDataBlock* output, SUdfInterBuf *newState); + SSDataBlock *output, SUdfInterBuf *newState); int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf); int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState); -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); +int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, + SUdfInterBuf *resultBuf); int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData); -int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output); +int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output); int32_t udfcOpen(); int32_t udfcClose(); -int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle); -void releaseUdfFuncHandle(char* udfName); +int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle); +void releaseUdfFuncHandle(char *udfName); int32_t cleanUpUdfs(); -bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); -bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); +bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv); +bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo); int32_t udfAggProcess(struct SqlFunctionCtx *pCtx); -int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock); +int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock); -int compareUdfcFuncSub(const void* elem1, const void* elem2) { +int compareUdfcFuncSub(const void *elem1, const void *elem2) { SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1; SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2; return strcmp(stub1->udfName, stub2->udfName); } -int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { +int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { int32_t code = 0; uv_mutex_lock(&gUdfdProxy.udfStubsMutex); SUdfcFuncStub key = {0}; @@ -925,15 +903,15 @@ int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { if (stubIndex != -1) { SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex); UdfcFuncHandle handle = foundStub->handle; - if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { *pHandle = foundStub->handle; ++foundStub->refCount; foundStub->lastRefTime = taosGetTimestampUs(); uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); return 0; } else { - fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", - udfName, foundStub->refCount, foundStub->lastRefTime); + fnInfo("invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", udfName, + foundStub->refCount, foundStub->lastRefTime); taosArrayRemove(gUdfdProxy.udfStubs, stubIndex); } } @@ -955,7 +933,7 @@ int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { return code; } -void releaseUdfFuncHandle(char* udfName) { +void releaseUdfFuncHandle(char *udfName) { uv_mutex_lock(&gUdfdProxy.udfStubsMutex); SUdfcFuncStub key = {0}; strcpy(key.udfName, udfName); @@ -977,11 +955,11 @@ int32_t cleanUpUdfs() { } uv_mutex_lock(&gUdfdProxy.udfStubsMutex); - if (gUdfdProxy.udfStubs == NULL || taosArrayGetSize(gUdfdProxy.udfStubs) == 0) { + if (gUdfdProxy.udfStubs == NULL || taosArrayGetSize(gUdfdProxy.udfStubs) == 0) { uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); return TSDB_CODE_SUCCESS; } - SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); + SArray *udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub)); int32_t i = 0; while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) { SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i); @@ -989,13 +967,13 @@ int32_t cleanUpUdfs() { fnInfo("tear down udf. udf name: %s, handle: %p, ref count: %d", stub->udfName, stub->handle, stub->refCount); doTeardownUdf(stub->handle); } else { - fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p", - stub->udfName, stub->refCount, stub->lastRefTime, stub->handle); + fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %" PRId64 ", handle: %p", stub->udfName, + stub->refCount, stub->lastRefTime, stub->handle); UdfcFuncHandle handle = stub->handle; - if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) { + if (handle != NULL && ((SUdfcUvSession *)handle)->udfUvPipe != NULL) { taosArrayPush(udfStubs, stub); } else { - fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache", + fnInfo("udf invalid handle for %s, refCount: %d, last ref time: %" PRId64 ". remove it from cache", stub->udfName, stub->refCount, stub->lastRefTime); } } @@ -1009,7 +987,7 @@ int32_t cleanUpUdfs() { int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { UdfcFuncHandle handle = NULL; - int32_t code = acquireUdfFuncHandle(udfName, &handle); + int32_t code = acquireUdfFuncHandle(udfName, &handle); if (code != 0) { return code; } @@ -1020,8 +998,8 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } else { if (session->outputType != output->columnData->info.type || session->outputLen != output->columnData->info.bytes) { - fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", session->outputType, - session->outputLen, output->columnData->info.type, output->columnData->info.bytes); + fnError("udfc scalar function calculate error. type mismatch. session type: %d(%d), output type: %d(%d)", + session->outputType, session->outputLen, output->columnData->info.type, output->columnData->info.bytes); code = TSDB_CODE_UDF_INVALID_OUTPUT_TYPE; } } @@ -1029,7 +1007,7 @@ int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, return code; } -bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { +bool udfAggGetEnv(struct SFunctionNode *pFunc, SFuncExecEnv *pEnv) { if (fmIsScalarFunc(pFunc->funcId)) { return false; } @@ -1037,23 +1015,23 @@ bool udfAggGetEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv) { return true; } -bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo) { +bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo *pResultCellInfo) { if (functionSetup(pCtx, pResultCellInfo) != true) { return false; } UdfcFuncHandle handle; - int32_t udfCode = 0; + int32_t udfCode = 0; if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode); return false; } SUdfcUvSession *session = (SUdfcUvSession *)handle; - SUdfAggRes *udfRes = (SUdfAggRes*)GET_ROWCELL_INTERBUF(pResultCellInfo); - int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; + SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(pResultCellInfo); + int32_t envSize = sizeof(SUdfAggRes) + session->outputLen + session->bufSize; memset(udfRes, 0, envSize); - udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; + udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; SUdfInterBuf buf = {0}; if ((udfCode = doCallUdfAggInit(handle, &buf)) != 0) { @@ -1075,7 +1053,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult } int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { - int32_t udfCode = 0; + int32_t udfCode = 0; UdfcFuncHandle handle = 0; if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); @@ -1083,16 +1061,16 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { } SUdfcUvSession *session = handle; - SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; + SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; - SInputColumnInfoData* pInput = &pCtx->input; - int32_t numOfCols = pInput->numOfInputCols; - int32_t start = pInput->startRowIndex; - int32_t numOfRows = pInput->numOfRows; + SInputColumnInfoData *pInput = &pCtx->input; + int32_t numOfCols = pInput->numOfInputCols; + int32_t start = pInput->startRowIndex; + int32_t numOfRows = pInput->numOfRows; - SSDataBlock* pTempBlock = createDataBlock(); + SSDataBlock *pTempBlock = createDataBlock(); pTempBlock->info.rows = pInput->totalRows; pTempBlock->info.uid = pInput->uid; for (int32_t i = 0; i < numOfCols; ++i) { @@ -1101,9 +1079,7 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { SSDataBlock *inputBlock = blockDataExtractBlock(pTempBlock, start, numOfRows); - SUdfInterBuf state = {.buf = udfRes->interResBuf, - .bufLen = session->bufSize, - .numOfResult = udfRes->interResNum}; + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; SUdfInterBuf newState = {0}; udfCode = doCallUdfAggProcess(session, inputBlock, &state, &newState); @@ -1133,8 +1109,8 @@ int32_t udfAggProcess(struct SqlFunctionCtx *pCtx) { return udfCode; } -int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { - int32_t udfCode = 0; +int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock) { + int32_t udfCode = 0; UdfcFuncHandle handle = 0; if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) { fnError("udfAggProcess error. step acquireUdfFuncHandle. udf code: %d", udfCode); @@ -1142,17 +1118,14 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { } SUdfcUvSession *session = handle; - SUdfAggRes* udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); - udfRes->finalResBuf = (char*)udfRes + sizeof(SUdfAggRes); - udfRes->interResBuf = (char*)udfRes + sizeof(SUdfAggRes) + session->outputLen; - + SUdfAggRes *udfRes = (SUdfAggRes *)GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); + udfRes->finalResBuf = (char *)udfRes + sizeof(SUdfAggRes); + udfRes->interResBuf = (char *)udfRes + sizeof(SUdfAggRes) + session->outputLen; SUdfInterBuf resultBuf = {0}; - SUdfInterBuf state = {.buf = udfRes->interResBuf, - .bufLen = session->bufSize, - .numOfResult = udfRes->interResNum}; - int32_t udfCallCode= 0; - udfCallCode= doCallUdfAggFinalize(session, &state, &resultBuf); + SUdfInterBuf state = {.buf = udfRes->interResBuf, .bufLen = session->bufSize, .numOfResult = udfRes->interResNum}; + int32_t udfCallCode = 0; + udfCallCode = doCallUdfAggFinalize(session, &state, &resultBuf); if (udfCallCode != 0) { fnError("udfAggFinalize error. doCallUdfAggFinalize step. udf code:%d", udfCallCode); GET_RES_INFO(pCtx)->numOfRes = 0; @@ -1178,7 +1151,7 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) { void onUdfcPipeClose(uv_handle_t *handle) { SClientUvConn *conn = handle->data; if (!QUEUE_EMPTY(&conn->taskQueue)) { - QUEUE* h = QUEUE_HEAD(&conn->taskQueue); + QUEUE *h = QUEUE_HEAD(&conn->taskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); task->errCode = 0; QUEUE_REMOVE(&task->procTaskQueue); @@ -1189,7 +1162,7 @@ void onUdfcPipeClose(uv_handle_t *handle) { } taosMemoryFree(conn->readBuf.buf); taosMemoryFree(conn); - taosMemoryFree((uv_pipe_t *) handle); + taosMemoryFree((uv_pipe_t *)handle); } int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode *uvTask) { @@ -1197,7 +1170,7 @@ int32_t udfcGetUdfTaskResultFromUvTask(SClientUdfTask *task, SClientUvTaskNode * if (uvTask->type == UV_TASK_REQ_RSP) { if (uvTask->rspBuf.base != NULL) { SUdfResponse rsp = {0}; - void* buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); + void *buf = decodeUdfResponse(uvTask->rspBuf.base, &rsp); assert(uvTask->rspBuf.len == POINTER_DISTANCE(buf, uvTask->rspBuf.base)); task->errCode = rsp.code; @@ -1273,7 +1246,7 @@ void udfcAllocateBuffer(uv_handle_t *handle, size_t suggestedSize, uv_buf_t *buf bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) { if (connBuf->total == -1 && connBuf->len >= sizeof(int32_t)) { - connBuf->total = *(int32_t *) (connBuf->buf); + connBuf->total = *(int32_t *)(connBuf->buf); } if (connBuf->len == connBuf->cap && connBuf->total == connBuf->cap) { fnDebug("udfc complete message is received, now handle it"); @@ -1284,15 +1257,15 @@ bool isUdfcUvMsgComplete(SClientConnBuf *connBuf) { void udfcUvHandleRsp(SClientUvConn *conn) { SClientConnBuf *connBuf = &conn->readBuf; - int64_t seqNum = *(int64_t *) (connBuf->buf + sizeof(int32_t)); // msglen then seqnum + int64_t seqNum = *(int64_t *)(connBuf->buf + sizeof(int32_t)); // msglen then seqnum if (QUEUE_EMPTY(&conn->taskQueue)) { - fnError("udfc no task waiting on connection. response seqnum:%"PRId64, seqNum); + fnError("udfc no task waiting on connection. response seqnum:%" PRId64, seqNum); return; } - bool found = false; + bool found = false; SClientUvTaskNode *taskFound = NULL; - QUEUE* h = QUEUE_NEXT(&conn->taskQueue); + QUEUE *h = QUEUE_NEXT(&conn->taskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); while (h != &conn->taskQueue) { @@ -1327,7 +1300,7 @@ void udfcUvHandleRsp(SClientUvConn *conn) { void udfcUvHandleError(SClientUvConn *conn) { fnDebug("handle error on conn: %p, pipe: %p", conn, conn->pipe); while (!QUEUE_EMPTY(&conn->taskQueue)) { - QUEUE* h = QUEUE_HEAD(&conn->taskQueue); + QUEUE *h = QUEUE_HEAD(&conn->taskQueue); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, connTaskQueue); task->errCode = TSDB_CODE_UDF_PIPE_READ_ERR; QUEUE_REMOVE(&task->connTaskQueue); @@ -1335,14 +1308,14 @@ void udfcUvHandleError(SClientUvConn *conn) { uv_sem_post(&task->taskSem); } - uv_close((uv_handle_t *) conn->pipe, onUdfcPipeClose); + uv_close((uv_handle_t *)conn->pipe, onUdfcPipeClose); } void onUdfcPipeRead(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) { fnDebug("udfc client %p, client read from pipe. nread: %zd", client, nread); if (nread == 0) return; - SClientUvConn *conn = client->data; + SClientUvConn *conn = client->data; SClientConnBuf *connBuf = &conn->readBuf; if (nread > 0) { connBuf->len += nread; @@ -1373,7 +1346,7 @@ void onUdfcPipeWrite(uv_write_t *write, int status) { void onUdfcPipeConnect(uv_connect_t *connect, int status) { SClientUvTaskNode *uvTask = connect->data; if (status != 0) { - fnError("client connect error, task seq: %"PRId64", code: %s", uvTask->seqNum, uv_strerror(status)); + fnError("client connect error, task seq: %" PRId64 ", code: %s", uvTask->seqNum, uv_strerror(status)); } uvTask->errCode = status; @@ -1392,7 +1365,7 @@ int32_t udfcInitializeUvTask(SClientUdfTask *task, int8_t uvTaskType, SClientUvT uvTask->pipe = task->session->udfUvPipe; SUdfRequest request; request.type = task->type; - request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1); + request.seqNum = atomic_fetch_add_64(&gUdfTaskSeqNum, 1); if (task->type == UDF_TASK_SETUP) { request.setup = task->_setup.req; @@ -1430,14 +1403,14 @@ int32_t udfcQueueUvTask(SClientUvTaskNode *uvTask) { uv_async_send(&udfc->loopTaskAync); uv_sem_wait(&uvTask->taskSem); - fnInfo("udfc uvTask finished. uvTask:%"PRId64"-%d-%p", uvTask->seqNum, uvTask->type, uvTask); + fnInfo("udfc uvTask finished. uvTask:%" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask); uv_sem_destroy(&uvTask->taskSem); return 0; } int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { - fnDebug("event loop start uv task. uvTask: %"PRId64"-%d-%p", uvTask->seqNum, uvTask->type, uvTask); + fnDebug("event loop start uv task. uvTask: %" PRId64 "-%d-%p", uvTask->seqNum, uvTask->type, uvTask); int32_t code = 0; switch (uvTask->type) { @@ -1469,7 +1442,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { } else { uv_write_t *write = taosMemoryMalloc(sizeof(uv_write_t)); write->data = pipe->data; - QUEUE* connTaskQueue = &((SClientUvConn*)pipe->data)->taskQueue; + QUEUE *connTaskQueue = &((SClientUvConn *)pipe->data)->taskQueue; QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue); int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite); if (err != 0) { @@ -1492,8 +1465,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { break; } default: { - fnError("udfc event loop unknown task type.") - break; + fnError("udfc event loop unknown task type.") break; } } @@ -1502,17 +1474,17 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { void udfcAsyncTaskCb(uv_async_t *async) { SUdfcProxy *udfc = async->data; - QUEUE wq; + QUEUE wq; uv_mutex_lock(&udfc->taskQueueMutex); QUEUE_MOVE(&udfc->taskQueue, &wq); uv_mutex_unlock(&udfc->taskQueueMutex); while (!QUEUE_EMPTY(&wq)) { - QUEUE* h = QUEUE_HEAD(&wq); + QUEUE *h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); - int32_t code = udfcStartUvTask(task); + int32_t code = udfcStartUvTask(task); if (code == 0) { QUEUE_INSERT_TAIL(&udfc->uvProcTaskQueue, &task->procTaskQueue); } else { @@ -1520,19 +1492,17 @@ void udfcAsyncTaskCb(uv_async_t *async) { uv_sem_post(&task->taskSem); } } - } void cleanUpUvTasks(SUdfcProxy *udfc) { - fnDebug("clean up uv tasks") - QUEUE wq; + fnDebug("clean up uv tasks") QUEUE wq; uv_mutex_lock(&udfc->taskQueueMutex); QUEUE_MOVE(&udfc->taskQueue, &wq); uv_mutex_unlock(&udfc->taskQueueMutex); while (!QUEUE_EMPTY(&wq)) { - QUEUE* h = QUEUE_HEAD(&wq); + QUEUE *h = QUEUE_HEAD(&wq); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, recvTaskQueue); if (udfc->udfcState == UDFC_STATE_STOPPING) { @@ -1542,7 +1512,7 @@ void cleanUpUvTasks(SUdfcProxy *udfc) { } while (!QUEUE_EMPTY(&udfc->uvProcTaskQueue)) { - QUEUE* h = QUEUE_HEAD(&udfc->uvProcTaskQueue); + QUEUE *h = QUEUE_HEAD(&udfc->uvProcTaskQueue); QUEUE_REMOVE(h); SClientUvTaskNode *task = QUEUE_DATA(h, SClientUvTaskNode, procTaskQueue); if (udfc->udfcState == UDFC_STATE_STOPPING) { @@ -1572,7 +1542,7 @@ void constructUdfService(void *argsThread) { QUEUE_INIT(&udfc->taskQueue); QUEUE_INIT(&udfc->uvProcTaskQueue); uv_barrier_wait(&udfc->initBarrier); - //TODO return value of uv_run + // TODO return value of uv_run uv_run(&udfc->uvLoop, UV_RUN_DEFAULT); uv_loop_close(&udfc->uvLoop); @@ -1596,8 +1566,7 @@ int32_t udfcOpen() { uv_barrier_wait(&proxy->initBarrier); uv_mutex_init(&proxy->udfStubsMutex); proxy->udfStubs = taosArrayInit(8, sizeof(SUdfcFuncStub)); - fnInfo("udfc initialized") - return 0; + fnInfo("udfc initialized") return 0; } int32_t udfcClose() { @@ -1644,7 +1613,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { if (gUdfdProxy.udfcState != UDFC_STATE_READY) { return TSDB_CODE_UDF_INVALID_STATE; } - SClientUdfTask *task = taosMemoryCalloc(1,sizeof(SClientUdfTask)); + SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); task->errCode = 0; task->session = taosMemoryCalloc(1, sizeof(SUdfcUvSession)); task->session->udfc = &gUdfdProxy; @@ -1681,16 +1650,16 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { } int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2, - SSDataBlock* output, SUdfInterBuf *newState) { + SSDataBlock *output, SUdfInterBuf *newState) { fnDebug("udfc call udf. callType: %d, funcHandle: %p", callType, handle); - SUdfcUvSession *session = (SUdfcUvSession *) handle; + SUdfcUvSession *session = (SUdfcUvSession *)handle; if (session->udfUvPipe == NULL) { fnError("No pipe to udfd"); return TSDB_CODE_UDF_PIPE_NO_PIPE; } SClientUdfTask *task = taosMemoryCalloc(1, sizeof(SClientUdfTask)); task->errCode = 0; - task->session = (SUdfcUvSession *) handle; + task->session = (SUdfcUvSession *)handle; task->type = UDF_TASK_CALL; SUdfCallRequest *req = &task->_call.req; @@ -1767,15 +1736,16 @@ int32_t doCallUdfAggInit(UdfcFuncHandle handle, SUdfInterBuf *interBuf) { // input: block, state // output: interbuf, int32_t doCallUdfAggProcess(UdfcFuncHandle handle, SSDataBlock *block, SUdfInterBuf *state, SUdfInterBuf *newState) { - int8_t callType = TSDB_UDF_CALL_AGG_PROC; + int8_t callType = TSDB_UDF_CALL_AGG_PROC; int32_t err = callUdf(handle, callType, block, state, NULL, NULL, newState); return err; } // input: interbuf1, interbuf2 // output: resultBuf -int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf) { - int8_t callType = TSDB_UDF_CALL_AGG_MERGE; +int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, + SUdfInterBuf *resultBuf) { + int8_t callType = TSDB_UDF_CALL_AGG_MERGE; int32_t err = callUdf(handle, callType, NULL, interBuf1, interBuf2, NULL, resultBuf); return err; } @@ -1783,17 +1753,17 @@ int32_t doCallUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfIn // input: interBuf // output: resultData int32_t doCallUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfInterBuf *resultData) { - int8_t callType = TSDB_UDF_CALL_AGG_FIN; + int8_t callType = TSDB_UDF_CALL_AGG_FIN; int32_t err = callUdf(handle, callType, NULL, interBuf, NULL, NULL, resultData); return err; } -int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) { - int8_t callType = TSDB_UDF_CALL_SCALA_PROC; +int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output) { + int8_t callType = TSDB_UDF_CALL_SCALA_PROC; SSDataBlock inputBlock = {0}; convertScalarParamToDataBlock(input, numOfCols, &inputBlock); SSDataBlock resultBlock = {0}; - int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); + int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); if (err == 0) { convertDataBlockToScalarParm(&resultBlock, output); taosArrayDestroy(resultBlock.pDataBlock); @@ -1804,7 +1774,7 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t } int32_t doTeardownUdf(UdfcFuncHandle handle) { - SUdfcUvSession *session = (SUdfcUvSession *) handle; + SUdfcUvSession *session = (SUdfcUvSession *)handle; if (session->udfUvPipe == NULL) { fnError("tear down udf. pipe to udfd does not exist. udf name: %s", session->udfName); @@ -1827,7 +1797,7 @@ int32_t doTeardownUdf(UdfcFuncHandle handle) { udfcRunUdfUvTask(task, UV_TASK_DISCONNECT); fnInfo("tear down udf. udf name: %s, udf func handle: %p", session->udfName, handle); - //TODO: synchronization refactor between libuv event loop and request thread + // TODO: synchronization refactor between libuv event loop and request thread if (session->udfUvPipe != NULL && session->udfUvPipe->data != NULL) { SClientUvConn *conn = session->udfUvPipe->data; conn->session = NULL; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index 7395f1e14c..f1e3f4c60c 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -53,7 +53,7 @@ int scalarFuncTest() { blockDataEnsureCapacity(pBlock, 1024); pBlock->info.rows = 1024; - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0); + SColumnInfoData *pCol = taosArrayGet(pBlock->pDataBlock, 0); for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataAppendInt32(pCol, j, &j); } @@ -68,14 +68,13 @@ int scalarFuncTest() { SColumnInfoData *col = output.columnData; for (int32_t i = 0; i < output.numOfRows; ++i) { - if (i % 100 == 0) - fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); + if (i % 100 == 0) fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t))); } colDataDestroy(output.columnData); taosMemoryFree(output.columnData); } int64_t end = taosGetTimestampUs(); - fprintf(stderr, "time: %f\n", (end-beg)/1000.0); + fprintf(stderr, "time: %f\n", (end - beg) / 1000.0); doTeardownUdf(handle); return 0; @@ -92,13 +91,13 @@ int aggregateFuncTest() { SSDataBlock *pBlock = createDataBlock(); for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { SColumnInfoData colInfo = createColumnInfoData(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1); - blockDataAppendColInfo(pBlock, &colInfo); + blockDataAppendColInfo(pBlock, &colInfo); } blockDataEnsureCapacity(pBlock, 1024); pBlock->info.rows = 1024; - SColumnInfoData* pColInfo = bdGetColumnInfoData(pBlock, 0); + SColumnInfoData *pColInfo = bdGetColumnInfoData(pBlock, 0); for (int32_t j = 0; j < pBlock->info.rows; ++j) { colDataAppendInt32(pColInfo, j, &j); } @@ -111,7 +110,7 @@ int aggregateFuncTest() { taosArrayDestroy(pBlock->pDataBlock); doCallUdfAggFinalize(handle, &newBuf, &resultBuf); - fprintf(stderr, "agg result: %f\n", *(double*)resultBuf.buf); + fprintf(stderr, "agg result: %f\n", *(double *)resultBuf.buf); freeUdfInterBuf(&buf); freeUdfInterBuf(&newBuf); diff --git a/source/libs/function/test/udf1.c b/source/libs/function/test/udf1.c index 5be18af553..71d30b6755 100644 --- a/source/libs/function/test/udf1.c +++ b/source/libs/function/test/udf1.c @@ -1,6 +1,6 @@ -#include -#include #include +#include +#include #ifdef LINUX #include #endif @@ -9,16 +9,11 @@ #endif #include "taosudf.h" +DLL_EXPORT int32_t udf1_init() { return 0; } -DLL_EXPORT int32_t udf1_init() { - return 0; -} - -DLL_EXPORT int32_t udf1_destroy() { - return 0; -} +DLL_EXPORT int32_t udf1_destroy() { return 0; } -DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { +DLL_EXPORT int32_t udf1(SUdfDataBlock *block, SUdfColumn *resultCol) { SUdfColumnMeta *meta = &resultCol->colMeta; meta->bytes = 4; meta->type = TSDB_DATA_TYPE_INT; @@ -35,14 +30,14 @@ DLL_EXPORT int32_t udf1(SUdfDataBlock* block, SUdfColumn *resultCol) { break; } } - if ( j == block->numOfCols) { + if (j == block->numOfCols) { int32_t luckyNum = 88; udfColDataSet(resultCol, i, (char *)&luckyNum, false); } } - //to simulate actual processing delay by udf + // to simulate actual processing delay by udf #ifdef LINUX - usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) + usleep(1 * 1000); // usleep takes sleep time in us (1 millionth of a second) #endif #ifdef WINDOWS Sleep(1); diff --git a/source/libs/function/test/udf2.c b/source/libs/function/test/udf2.c index 975832209e..e24789d0fb 100644 --- a/source/libs/function/test/udf2.c +++ b/source/libs/function/test/udf2.c @@ -1,32 +1,27 @@ -#include -#include -#include #include +#include +#include +#include #include "taosudf.h" -DLL_EXPORT int32_t udf2_init() { - return 0; -} +DLL_EXPORT int32_t udf2_init() { return 0; } -DLL_EXPORT int32_t udf2_destroy() { - return 0; -} +DLL_EXPORT int32_t udf2_destroy() { return 0; } -DLL_EXPORT int32_t udf2_start(SUdfInterBuf *buf) { +DLL_EXPORT int32_t udf2_start(SUdfInterBuf* buf) { *(int64_t*)(buf->buf) = 0; buf->bufLen = sizeof(double); buf->numOfResult = 0; return 0; } -DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { +DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf* interBuf, SUdfInterBuf* newInterBuf) { double sumSquares = *(double*)interBuf->buf; int8_t numNotNull = 0; for (int32_t i = 0; i < block->numOfCols; ++i) { SUdfColumn* col = block->udfCols[i]; - if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || - col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { + if (!(col->colMeta.type == TSDB_DATA_TYPE_INT || col->colMeta.type == TSDB_DATA_TYPE_DOUBLE)) { return TSDB_CODE_UDF_INVALID_INPUT; } } @@ -38,18 +33,18 @@ DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterB } switch (col->colMeta.type) { case TSDB_DATA_TYPE_INT: { - char* cell = udfColDataGetData(col, j); + char* cell = udfColDataGetData(col, j); int32_t num = *(int32_t*)cell; sumSquares += (double)num * num; break; } case TSDB_DATA_TYPE_DOUBLE: { - char* cell = udfColDataGetData(col, j); + char* cell = udfColDataGetData(col, j); double num = *(double*)cell; sumSquares += num * num; break; } - default: + default: break; } ++numNotNull; @@ -67,7 +62,7 @@ DLL_EXPORT int32_t udf2(SUdfDataBlock* block, SUdfInterBuf *interBuf, SUdfInterB return 0; } -DLL_EXPORT int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf *resultData) { +DLL_EXPORT int32_t udf2_finish(SUdfInterBuf* buf, SUdfInterBuf* resultData) { if (buf->numOfResult == 0) { resultData->numOfResult = 0; return 0; diff --git a/tools/scripts/codeFormat.sh b/tools/scripts/codeFormat.sh index 00f56d89f0..798b566344 100644 --- a/tools/scripts/codeFormat.sh +++ b/tools/scripts/codeFormat.sh @@ -16,7 +16,7 @@ FORMAT_DIR_LIST=( "${PRJ_ROOT_DIR}/source/libs/catalog" "${PRJ_ROOT_DIR}/source/libs/command" "${PRJ_ROOT_DIR}/source/libs/executor" - # "${PRJ_ROOT_DIR}/source/libs/function" + "${PRJ_ROOT_DIR}/source/libs/function" "${PRJ_ROOT_DIR}/source/libs/index" "${PRJ_ROOT_DIR}/source/libs/monitor" "${PRJ_ROOT_DIR}/source/libs/nodes" @@ -49,3 +49,5 @@ for d in ${FORMAT_DIR_LIST[@]}; do done cd ${ORIGIN_DIR} + +# find source/libs/ -path ./source/libs/qworker -prune -o -regex '.*\.\(cpp\|hpp\|c\|h\)' -print -- GitLab