提交 025c9258 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into enh/TD-19660

...@@ -234,8 +234,6 @@ struct STag { ...@@ -234,8 +234,6 @@ struct STag {
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. // Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP #define TD_SUPPORT_BITMAP
#define TASSERT(x) ASSERT(x)
#define STR_TO_VARSTR(x, str) \ #define STR_TO_VARSTR(x, str) \
do { \ do { \
VarDataLenT __len = (VarDataLenT)strlen(str); \ VarDataLenT __len = (VarDataLenT)strlen(str); \
......
...@@ -255,7 +255,7 @@ static FORCE_INLINE void *tdGetBitmapAddrKv(STSRow *pRow, col_id_t nKvCols) { ...@@ -255,7 +255,7 @@ static FORCE_INLINE void *tdGetBitmapAddrKv(STSRow *pRow, col_id_t nKvCols) {
void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_t flen, col_id_t nKvCols); void *tdGetBitmapAddr(STSRow *pRow, uint8_t rowType, uint32_t flen, col_id_t nKvCols);
int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TDRowValT valType, int8_t bitmapMode); int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TDRowValT valType, int8_t bitmapMode);
int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx, TDRowValT valType); int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx, TDRowValT valType);
bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode); // bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode);
int32_t tdGetBitmapValType(const void *pBitmap, int16_t colIdx, TDRowValT *pValType, int8_t bitmapMode); int32_t tdGetBitmapValType(const void *pBitmap, int16_t colIdx, TDRowValT *pValType, int8_t bitmapMode);
// ----------------- Tuple row structure(STpRow) // ----------------- Tuple row structure(STpRow)
......
...@@ -68,9 +68,10 @@ int32_t streamStateSessionClear(SStreamState* pState); ...@@ -68,9 +68,10 @@ int32_t streamStateSessionClear(SStreamState* pState);
int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateSessionGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, const void** pVal, int32_t* pVLen);
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); state_key_cmpr_fn fn, void** pVal, int32_t* pVLen);
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey);
SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key);
SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key); SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKey* key);
int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateFillPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen);
......
...@@ -1858,7 +1858,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { ...@@ -1858,7 +1858,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
char* pData = colDataGetVarData(pColInfoData, j); char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
memset(pBuf, 0, dataSize); memset(pBuf, 0, dataSize);
taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
printf(" %15s |", pBuf); printf(" %15s |", pBuf);
} break; } break;
default: default:
...@@ -1946,7 +1946,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) ...@@ -1946,7 +1946,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
char* pData = colDataGetVarData(pColInfoData, j); char* pData = colDataGetVarData(pColInfoData, j);
int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData)); int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData));
memset(pBuf, 0, sizeof(pBuf)); memset(pBuf, 0, sizeof(pBuf));
taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); (void)taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) return dumpBuf; if (len >= size - 1) return dumpBuf;
} break; } break;
...@@ -2053,7 +2053,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB ...@@ -2053,7 +2053,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
case TSDB_DATA_TYPE_JSON: case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_MEDIUMBLOB: case TSDB_DATA_TYPE_MEDIUMBLOB:
uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type); uError("the column type %" PRIi16 " is defined but not implemented yet", pColInfoData->info.type);
TASSERT(0); ASSERT(0);
break; break;
default: default:
if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) { if (pColInfoData->info.type < TSDB_DATA_TYPE_MAX && pColInfoData->info.type > TSDB_DATA_TYPE_NULL) {
...@@ -2084,7 +2084,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB ...@@ -2084,7 +2084,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SSDataBlock* pDataB
} }
} else { } else {
uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type); uError("the column type %" PRIi16 " is undefined\n", pColInfoData->info.type);
TASSERT(0); ASSERT(0);
} }
break; break;
} }
......
...@@ -481,7 +481,7 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell ...@@ -481,7 +481,7 @@ bool tdSTSRowGetVal(STSRowIter *pIter, col_id_t colId, col_type_t colType, SCell
int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pValType) { int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pValType) {
if (!pBitmap || colIdx < 0) { if (!pBitmap || colIdx < 0) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -503,7 +503,7 @@ int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pVa ...@@ -503,7 +503,7 @@ int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pVa
*pValType = ((*pDestByte) & 0x03); *pValType = ((*pDestByte) & 0x03);
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -512,7 +512,7 @@ int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pVa ...@@ -512,7 +512,7 @@ int32_t tdGetBitmapValTypeII(const void *pBitmap, int16_t colIdx, TDRowValT *pVa
int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pValType) { int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pValType) {
if (!pBitmap || colIdx < 0) { if (!pBitmap || colIdx < 0) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -546,7 +546,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal ...@@ -546,7 +546,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal
*pValType = ((*pDestByte) & 0x01); *pValType = ((*pDestByte) & 0x01);
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -555,7 +555,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal ...@@ -555,7 +555,7 @@ int32_t tdGetBitmapValTypeI(const void *pBitmap, int16_t colIdx, TDRowValT *pVal
int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType) { int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType) {
if (!pBitmap || colIdx < 0) { if (!pBitmap || colIdx < 0) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -598,7 +598,7 @@ int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType) { ...@@ -598,7 +598,7 @@ int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType) {
// *pDestByte |= (valType); // *pDestByte |= (valType);
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -607,7 +607,7 @@ int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType) { ...@@ -607,7 +607,7 @@ int32_t tdSetBitmapValTypeI(void *pBitmap, int16_t colIdx, TDRowValT valType) {
int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_t offset, int16_t colIdx) { int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_t offset, int16_t colIdx) {
#ifdef TD_SUPPORT_BITMAP #ifdef TD_SUPPORT_BITMAP
TASSERT(colIdx < tdRowGetNCols(pRow) - 1); ASSERT(colIdx < tdRowGetNCols(pRow) - 1);
if (tdGetBitmapValType(pBitmap, colIdx, &output->valType, 0) != TSDB_CODE_SUCCESS) { if (tdGetBitmapValType(pBitmap, colIdx, &output->valType, 0) != TSDB_CODE_SUCCESS) {
output->valType = TD_VTYPE_NONE; output->valType = TD_VTYPE_NONE;
return terrno; return terrno;
...@@ -621,7 +621,7 @@ int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_ ...@@ -621,7 +621,7 @@ int32_t tdGetKvRowValOfCol(SCellVal *output, STSRow *pRow, void *pBitmap, int32_
output->val = POINTER_SHIFT(pRow, offset); output->val = POINTER_SHIFT(pRow, offset);
} }
#else #else
TASSERT(0); ASSERT(0);
if (offset < 0) { if (offset < 0) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
output->valType = TD_VTYPE_NONE; output->valType = TD_VTYPE_NONE;
...@@ -671,7 +671,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp ...@@ -671,7 +671,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp
return terrno; return terrno;
} }
#else #else
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
#endif #endif
...@@ -709,7 +709,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp ...@@ -709,7 +709,7 @@ int32_t tdAppendColValToRow(SRowBuilder *pBuilder, col_id_t colId, int8_t colTyp
int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData, int32_t tdAppendColValToKvRow(SRowBuilder *pBuilder, TDRowValT valType, const void *val, bool isCopyVarData,
int8_t colType, int16_t colIdx, int32_t offset, col_id_t colId) { int8_t colType, int16_t colIdx, int32_t offset, col_id_t colId) {
if ((offset < (int32_t)sizeof(SKvRowIdx)) || (colIdx < 1)) { if ((offset < (int32_t)sizeof(SKvRowIdx)) || (colIdx < 1)) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -798,7 +798,7 @@ int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBou ...@@ -798,7 +798,7 @@ int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBou
pBuilder->nCols = nCols; pBuilder->nCols = nCols;
pBuilder->nBoundCols = nBoundCols; pBuilder->nBoundCols = nBoundCols;
if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) { if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -820,7 +820,7 @@ int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBou ...@@ -820,7 +820,7 @@ int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBou
int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
pBuilder->pBuf = (STSRow *)pBuf; pBuilder->pBuf = (STSRow *)pBuf;
if (!pBuilder->pBuf) { if (!pBuilder->pBuf) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -831,7 +831,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { ...@@ -831,7 +831,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
TD_ROW_SET_INFO(pBuilder->pBuf, 0); TD_ROW_SET_INFO(pBuilder->pBuf, 0);
TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType); TD_ROW_SET_TYPE(pBuilder->pBuf, pBuilder->rowType);
TASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0); ASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0);
uint32_t len = 0; uint32_t len = 0;
switch (pBuilder->rowType) { switch (pBuilder->rowType) {
...@@ -857,7 +857,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { ...@@ -857,7 +857,7 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
TD_ROW_SET_NCOLS(pBuilder->pBuf, pBuilder->nBoundCols); TD_ROW_SET_NCOLS(pBuilder->pBuf, pBuilder->nBoundCols);
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -868,12 +868,12 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { ...@@ -868,12 +868,12 @@ int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) {
int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) { int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
pBuilder->pBuf = (STSRow *)pBuf; pBuilder->pBuf = (STSRow *)pBuf;
if (!pBuilder->pBuf) { if (!pBuilder->pBuf) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
TASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0); ASSERT(pBuilder->nBitmaps > 0 && pBuilder->flen > 0);
uint32_t len = 0; uint32_t len = 0;
switch (pBuilder->rowType) { switch (pBuilder->rowType) {
...@@ -888,7 +888,7 @@ int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) { ...@@ -888,7 +888,7 @@ int32_t tdSRowGetBuf(SRowBuilder *pBuilder, void *pBuf) {
#endif #endif
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -908,7 +908,7 @@ int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t flen) { ...@@ -908,7 +908,7 @@ int32_t tdSRowSetTpInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t flen) {
pBuilder->flen = flen; pBuilder->flen = flen;
pBuilder->nCols = nCols; pBuilder->nCols = nCols;
if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) { if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -927,7 +927,7 @@ int32_t tdSRowSetInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBoundCols, ...@@ -927,7 +927,7 @@ int32_t tdSRowSetInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBoundCols,
pBuilder->nCols = nCols; pBuilder->nCols = nCols;
pBuilder->nBoundCols = nBoundCols; pBuilder->nBoundCols = nBoundCols;
if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) { if (pBuilder->flen <= 0 || pBuilder->nCols <= 0) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -956,13 +956,13 @@ int32_t tdGetBitmapValType(const void *pBitmap, int16_t colIdx, TDRowValT *pValT ...@@ -956,13 +956,13 @@ int32_t tdGetBitmapValType(const void *pBitmap, int16_t colIdx, TDRowValT *pValT
tdGetBitmapValTypeI(pBitmap, colIdx, pValType); tdGetBitmapValTypeI(pBitmap, colIdx, pValType);
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#if 0
bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode) { bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode) {
TDRowValT valType = 0; TDRowValT valType = 0;
tdGetBitmapValType(pBitmap, idx, &valType, bitmapMode); tdGetBitmapValType(pBitmap, idx, &valType, bitmapMode);
...@@ -971,10 +971,11 @@ bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode) ...@@ -971,10 +971,11 @@ bool tdIsBitmapValTypeNorm(const void *pBitmap, int16_t idx, int8_t bitmapMode)
} }
return false; return false;
} }
#endif
int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx, TDRowValT valType) { int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx, TDRowValT valType) {
if (!pBitmap || colIdx < 0) { if (!pBitmap || colIdx < 0) {
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -1001,7 +1002,7 @@ int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx, TDRowValT valType) { ...@@ -1001,7 +1002,7 @@ int32_t tdSetBitmapValTypeII(void *pBitmap, int16_t colIdx, TDRowValT valType) {
// *pDestByte |= (valType); // *pDestByte |= (valType);
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return terrno; return terrno;
} }
...@@ -1018,7 +1019,7 @@ int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TDRowValT valType, int ...@@ -1018,7 +1019,7 @@ int32_t tdSetBitmapValType(void *pBitmap, int16_t colIdx, TDRowValT valType, int
tdSetBitmapValTypeI(pBitmap, colIdx, valType); tdSetBitmapValTypeI(pBitmap, colIdx, valType);
break; break;
default: default:
TASSERT(0); ASSERT(0);
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
......
...@@ -113,7 +113,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) { ...@@ -113,7 +113,7 @@ static SSdbRaw *mndDbActionEncode(SDbObj *pDb) {
SDB_SET_INT8(pRaw, dataPos, pDb->cfg.hashMethod, _OVER) SDB_SET_INT8(pRaw, dataPos, pDb->cfg.hashMethod, _OVER)
SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, _OVER) SDB_SET_INT32(pRaw, dataPos, pDb->cfg.numOfRetensions, _OVER)
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) { for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
TASSERT(taosArrayGetSize(pDb->cfg.pRetensions) == pDb->cfg.numOfRetensions); ASSERT(taosArrayGetSize(pDb->cfg.pRetensions) == pDb->cfg.numOfRetensions);
SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i); SRetention *pRetension = taosArrayGet(pDb->cfg.pRetensions, i);
SDB_SET_INT64(pRaw, dataPos, pRetension->freq, _OVER) SDB_SET_INT64(pRaw, dataPos, pRetension->freq, _OVER)
SDB_SET_INT64(pRaw, dataPos, pRetension->keep, _OVER) SDB_SET_INT64(pRaw, dataPos, pRetension->keep, _OVER)
......
...@@ -3594,9 +3594,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT ...@@ -3594,9 +3594,7 @@ void getCurSessionWindow(SStreamAggSupporter* pAggSup, TSKEY startTs, TSKEY endT
pKey->win.skey = startTs; pKey->win.skey = startTs;
pKey->win.ekey = endTs; pKey->win.ekey = endTs;
pKey->groupId = groupId; pKey->groupId = groupId;
SStreamStateCur* pCur = streamStateSessionGetCur(pAggSup->pState, pKey); int32_t code = streamStateSessionGetKey(pAggSup->pState, pKey, pKey);
int32_t code = streamStateSessionGetKVByCur(pCur, pKey, NULL, 0);
streamStateFreeCur(pCur);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SET_SESSION_WIN_KEY_INVALID(pKey); SET_SESSION_WIN_KEY_INVALID(pKey);
} }
......
...@@ -35,7 +35,7 @@ enum { ...@@ -35,7 +35,7 @@ enum {
}; };
typedef struct SUdfSetupRequest { typedef struct SUdfSetupRequest {
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN + 1];
} SUdfSetupRequest; } SUdfSetupRequest;
typedef struct SUdfSetupResponse { typedef struct SUdfSetupResponse {
......
...@@ -315,7 +315,7 @@ enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 }; ...@@ -315,7 +315,7 @@ enum { UV_TASK_CONNECT = 0, UV_TASK_REQ_RSP = 1, UV_TASK_DISCONNECT = 2 };
int64_t gUdfTaskSeqNum = 0; int64_t gUdfTaskSeqNum = 0;
typedef struct SUdfcFuncStub { typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN + 1];
UdfcFuncHandle handle; UdfcFuncHandle handle;
int32_t refCount; int32_t refCount;
int64_t lastRefTime; int64_t lastRefTime;
...@@ -353,7 +353,7 @@ typedef struct SUdfcUvSession { ...@@ -353,7 +353,7 @@ typedef struct SUdfcUvSession {
int32_t outputLen; int32_t outputLen;
int32_t bufSize; int32_t bufSize;
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN + 1];
} SUdfcUvSession; } SUdfcUvSession;
typedef struct SClientUvTaskNode { typedef struct SClientUvTaskNode {
...@@ -898,7 +898,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { ...@@ -898,7 +898,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
int32_t code = 0; int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex); uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0}; SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName); strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (stubIndex != -1) { if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex); SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
...@@ -936,7 +936,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) { ...@@ -936,7 +936,7 @@ int32_t acquireUdfFuncHandle(char *udfName, UdfcFuncHandle *pHandle) {
void releaseUdfFuncHandle(char *udfName) { void releaseUdfFuncHandle(char *udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex); uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0}; SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName); strncpy(key.udfName, udfName, TSDB_FUNC_NAME_LEN);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ); SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (!foundStub) { if (!foundStub) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
...@@ -1446,6 +1446,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) { ...@@ -1446,6 +1446,7 @@ int32_t udfcStartUvTask(SClientUvTaskNode *uvTask) {
QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue); QUEUE_INSERT_TAIL(connTaskQueue, &uvTask->connTaskQueue);
int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite); int err = uv_write(write, (uv_stream_t *)pipe, &uvTask->reqBuf, 1, onUdfcPipeWrite);
if (err != 0) { if (err != 0) {
taosMemoryFree(write);
fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err)); fnError("udfc event loop start req_rsp task uv_write failed. uvtask: %p, code: %s", uvTask, uv_strerror(err));
} }
code = err; code = err;
...@@ -1637,7 +1638,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) { ...@@ -1637,7 +1638,7 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
task->session->outputType = rsp->outputType; task->session->outputType = rsp->outputType;
task->session->outputLen = rsp->outputLen; task->session->outputLen = rsp->outputLen;
task->session->bufSize = rsp->bufSize; task->session->bufSize = rsp->bufSize;
strcpy(task->session->udfName, udfName); strncpy(task->session->udfName, udfName, TSDB_FUNC_NAME_LEN);
if (task->errCode != 0) { if (task->errCode != 0) {
fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode) fnError("failed to setup udf. udfname: %s, err: %d", udfName, task->errCode)
} else { } else {
......
...@@ -71,7 +71,7 @@ typedef struct SUdf { ...@@ -71,7 +71,7 @@ typedef struct SUdf {
uv_cond_t condReady; uv_cond_t condReady;
bool resident; bool resident;
char name[TSDB_FUNC_NAME_LEN]; char name[TSDB_FUNC_NAME_LEN + 1];
int8_t funcType; int8_t funcType;
int8_t scriptType; int8_t scriptType;
int8_t outputType; int8_t outputType;
...@@ -188,11 +188,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -188,11 +188,12 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf)); SUdf *udfNew = taosMemoryCalloc(1, sizeof(SUdf));
udfNew->refCount = 1; udfNew->refCount = 1;
udfNew->state = UDF_STATE_INIT; udfNew->state = UDF_STATE_INIT;
uv_mutex_init(&udfNew->lock); uv_mutex_init(&udfNew->lock);
uv_cond_init(&udfNew->condReady); uv_cond_init(&udfNew->condReady);
udf = udfNew; udf = udfNew;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), &udfNew, sizeof(&udfNew)); SUdf** pUdf = &udf;
taosHashPut(global.udfsHash, request->setup.udfName, strlen(request->setup.udfName), pUdf, POINTER_BYTES);
uv_mutex_unlock(&global.udfsMutex); uv_mutex_unlock(&global.udfsMutex);
} }
...@@ -246,7 +247,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -246,7 +247,7 @@ void udfdProcessSetupRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
SUdfCallRequest *call = &request->call; SUdfCallRequest *call = &request->call;
fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request->seqNum, call->callType, call->udfHandle); fnDebug("call request. call type %d, handle: %" PRIx64 ", seq num %" PRId64 , call->callType, call->udfHandle, request->seqNum);
SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle); SUdfcFuncHandle * handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf * udf = handle->udf; SUdf * udf = handle->udf;
SUdfResponse response = {0}; SUdfResponse response = {0};
...@@ -372,7 +373,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) { ...@@ -372,7 +373,7 @@ void udfdProcessTeardownRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
} }
taosMemoryFree(handle); taosMemoryFree(handle);
SUdfResponse response; SUdfResponse response = {0};
SUdfResponse *rsp = &response; SUdfResponse *rsp = &response;
rsp->seqNum = request->seqNum; rsp->seqNum = request->seqNum;
rsp->type = request->type; rsp->type = request->type;
...@@ -428,7 +429,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { ...@@ -428,7 +429,9 @@ void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) { } else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0}; SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp); tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
if (retrieveRsp.pFuncInfos == NULL) {
goto _return;
}
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0); SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf * udf = msgInfo->param; SUdf * udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType; udf->funcType = pFuncInfo->funcType;
...@@ -540,7 +543,7 @@ int32_t udfdConnectToMnode() { ...@@ -540,7 +543,7 @@ int32_t udfdConnectToMnode() {
} }
int32_t udfdLoadUdf(char *udfName, SUdf *udf) { int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strcpy(udf->name, udfName); strncpy(udf->name, udfName, TSDB_FUNC_NAME_LEN);
int32_t err = 0; int32_t err = 0;
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf); err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
...@@ -880,6 +883,8 @@ static int32_t udfdUvInit() { ...@@ -880,6 +883,8 @@ static int32_t udfdUvInit() {
uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop) { if (loop) {
uv_loop_init(loop); uv_loop_init(loop);
} else {
return -1;
} }
global.loop = loop; global.loop = loop;
...@@ -901,12 +906,12 @@ static int32_t udfdUvInit() { ...@@ -901,12 +906,12 @@ static int32_t udfdUvInit() {
if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) { if ((r = uv_pipe_bind(&global.listeningPipe, global.listenPipeName))) {
fnError("Bind error %s", uv_err_name(r)); fnError("Bind error %s", uv_err_name(r));
removeListeningPipe(); removeListeningPipe();
return -1; return -2;
} }
if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) { if ((r = uv_listen((uv_stream_t *)&global.listeningPipe, 128, udfdOnNewConnection))) {
fnError("Listen error %s", uv_err_name(r)); fnError("Listen error %s", uv_err_name(r));
removeListeningPipe(); removeListeningPipe();
return -2; return -3;
} }
return 0; return 0;
} }
...@@ -962,6 +967,7 @@ int32_t udfdInitResidentFuncs() { ...@@ -962,6 +967,7 @@ int32_t udfdInitResidentFuncs() {
while ((token = strtok_r(pSave, ",", &pSave)) != NULL) { while ((token = strtok_r(pSave, ",", &pSave)) != NULL) {
char func[TSDB_FUNC_NAME_LEN+1] = {0}; char func[TSDB_FUNC_NAME_LEN+1] = {0};
strncpy(func, token, TSDB_FUNC_NAME_LEN); strncpy(func, token, TSDB_FUNC_NAME_LEN);
fnInfo("udfd add resident function %s", func);
taosArrayPush(global.residentFuncs, func); taosArrayPush(global.residentFuncs, func);
} }
......
...@@ -110,8 +110,11 @@ int aggregateFuncTest() { ...@@ -110,8 +110,11 @@ int aggregateFuncTest() {
taosArrayDestroy(pBlock->pDataBlock); taosArrayDestroy(pBlock->pDataBlock);
doCallUdfAggFinalize(handle, &newBuf, &resultBuf); doCallUdfAggFinalize(handle, &newBuf, &resultBuf);
if (resultBuf.buf != NULL) {
fprintf(stderr, "agg result: %f\n", *(double *)resultBuf.buf); fprintf(stderr, "agg result: %f\n", *(double *)resultBuf.buf);
} else {
fprintf(stderr, "result buffer is null");
}
freeUdfInterBuf(&buf); freeUdfInterBuf(&buf);
freeUdfInterBuf(&newBuf); freeUdfInterBuf(&newBuf);
freeUdfInterBuf(&resultBuf); freeUdfInterBuf(&resultBuf);
......
...@@ -367,7 +367,7 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) { ...@@ -367,7 +367,7 @@ int32_t idxConvertData(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(dst, src, strlen(src)); tlen = taosEncodeBinary(dst, src, strlen(src));
break; break;
default: default:
TASSERT(0); ASSERT(0);
break; break;
} }
*dst = (char*)*dst - tlen; *dst = (char*)*dst - tlen;
...@@ -459,7 +459,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) { ...@@ -459,7 +459,7 @@ int32_t idxConvertDataToStr(void* src, int8_t type, void** dst) {
*dst = (char*)*dst - tlen; *dst = (char*)*dst - tlen;
break; break;
default: default:
TASSERT(0); ASSERT(0);
break; break;
} }
return tlen; return tlen;
......
...@@ -526,7 +526,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) { ...@@ -526,7 +526,7 @@ int32_t streamStateSessionDel(SStreamState* pState, const SSessionKey* key) {
return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn); return tdbTbDelete(pState->pSessionStateDb, &sKey, sizeof(SStateSessionKey), &pState->txn);
} }
SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSessionKey* key) { SStreamStateCur* streamStateSessionSeekKeyCurrentPrev(SStreamState* pState, const SSessionKey* key) {
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
if (pCur == NULL) { if (pCur == NULL) {
return NULL; return NULL;
...@@ -544,7 +544,7 @@ SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSess ...@@ -544,7 +544,7 @@ SStreamStateCur* streamStateSessionSeekKeyPrev(SStreamState* pState, const SSess
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (c > 0) return pCur; if (c >= 0) return pCur;
if (tdbTbcMoveToPrev(pCur->pCur) < 0) { if (tdbTbcMoveToPrev(pCur->pCur) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
...@@ -572,7 +572,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess ...@@ -572,7 +572,7 @@ SStreamStateCur* streamStateSessionSeekKeyNext(SStreamState* pState, const SSess
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return NULL; return NULL;
} }
if (c > 0) return pCur; if (c < 0) return pCur;
if (tdbTbcMoveToNext(pCur->pCur) < 0) { if (tdbTbcMoveToNext(pCur->pCur) < 0) {
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
...@@ -630,7 +630,7 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe ...@@ -630,7 +630,7 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe
streamStateCurPrev(pState, pCur); streamStateCurPrev(pState, pCur);
SSessionKey tmpKey = *key; SSessionKey tmpKey = *key;
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == TSDB_CODE_SUCCESS && sessionKeyCmpr(key, &tmpKey) == 0) { if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
resKey = tmpKey; resKey = tmpKey;
} else { } else {
break; break;
...@@ -640,9 +640,28 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe ...@@ -640,9 +640,28 @@ SStreamStateCur* streamStateSessionGetCur(SStreamState* pState, const SSessionKe
return streamStateSessionGetRanomCur(pState, &resKey); return streamStateSessionGetRanomCur(pState, &resKey);
} }
int32_t streamStateSessionGetKey(SStreamState* pState, const SSessionKey* key, SSessionKey* curKey) {
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
SSessionKey resKey = *key;
int32_t res = -1;
while (1) {
SSessionKey tmpKey = *key;
int32_t code = streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0);
if (code == 0 && sessionKeyCmpr(key, &tmpKey) == 0) {
res = 0;
resKey = tmpKey;
} else {
break;
}
streamStateCurPrev(pState, pCur);
}
*curKey = resKey;
return res;
}
int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) { int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
// todo refactor // todo refactor
SStreamStateCur* pCur = streamStateSessionGetCur(pState, key); SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key);
int32_t size = *pVLen; int32_t size = *pVLen;
void* tmp = NULL; void* tmp = NULL;
*pVal = tdbRealloc(NULL, size); *pVal = tdbRealloc(NULL, size);
...@@ -659,7 +678,7 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key, ...@@ -659,7 +678,7 @@ int32_t streamStateSessionAddIfNotExist(SStreamState* pState, SSessionKey* key,
int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen, int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, char* pKeyData, int32_t keyDataLen,
state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) { state_key_cmpr_fn fn, void** pVal, int32_t* pVLen) {
// todo refactor // todo refactor
int32_t res = TSDB_CODE_SUCCESS; int32_t res = 0;
SSessionKey tmpKey = *key; SSessionKey tmpKey = *key;
int32_t valSize = *pVLen; int32_t valSize = *pVLen;
void* tmp = tdbRealloc(NULL, valSize); void* tmp = tdbRealloc(NULL, valSize);
...@@ -667,21 +686,14 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch ...@@ -667,21 +686,14 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
return -1; return -1;
} }
SStreamStateCur* pCur = streamStateSessionGetRanomCur(pState, key); SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentPrev(pState, key);
int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); int32_t code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) { if (code == 0) {
if (key->win.skey <= tmpKey.win.skey && tmpKey.win.ekey <= key->win.ekey) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
*pVal = tmp; goto _end;
streamStateFreeCur(pCur);
return res;
} }
streamStateFreeCur(pCur);
streamStateSessionPut(pState, key, NULL, 0);
pCur = streamStateSessionGetRanomCur(pState, key);
streamStateCurPrev(pState, pCur);
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
...@@ -689,11 +701,9 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch ...@@ -689,11 +701,9 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
} }
} }
streamStateFreeCur(pCur); streamStateCurNext(pState, pCur);
*key = tmpKey;
pCur = streamStateSessionSeekKeyNext(pState, key);
code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen); code = streamStateSessionGetKVByCur(pCur, key, (const void**)pVal, pVLen);
if (code == TSDB_CODE_SUCCESS) { if (code == 0) {
void* stateKey = (char*)(*pVal) + (valSize - keyDataLen); void* stateKey = (char*)(*pVal) + (valSize - keyDataLen);
if (fn(pKeyData, stateKey) == true) { if (fn(pKeyData, stateKey) == true) {
memcpy(tmp, *pVal, valSize); memcpy(tmp, *pVal, valSize);
...@@ -708,7 +718,6 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch ...@@ -708,7 +718,6 @@ int32_t streamStateStateAddIfNotExist(SStreamState* pState, SSessionKey* key, ch
_end: _end:
*pVal = tmp; *pVal = tmp;
streamStateSessionDel(pState, &tmpKey);
streamStateFreeCur(pCur); streamStateFreeCur(pCur);
return res; return res;
} }
...@@ -944,6 +944,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { ...@@ -944,6 +944,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
if (taosReadFile(pFile, buf, fileSize) <= 0) { if (taosReadFile(pFile, buf, fileSize) <= 0) {
taosCloseFile(&pFile); taosCloseFile(&pFile);
uError("load json file error: %s", filepath); uError("load json file error: %s", filepath);
taosMemoryFreeClear(buf);
return -1; return -1;
} }
taosCloseFile(&pFile); taosCloseFile(&pFile);
...@@ -953,6 +954,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) { ...@@ -953,6 +954,7 @@ int32_t cfgLoadFromApollUrl(SConfig *pConfig, const char *url) {
if (jsonParseError != NULL) { if (jsonParseError != NULL) {
uError("load json file parse error: %s", jsonParseError); uError("load json file parse error: %s", jsonParseError);
} }
taosMemoryFreeClear(buf);
return -1; return -1;
} }
taosMemoryFreeClear(buf); taosMemoryFreeClear(buf);
......
...@@ -562,23 +562,15 @@ void parseCommand(SWords* command, bool pattern) { ...@@ -562,23 +562,15 @@ void parseCommand(SWords* command, bool pattern) {
// free SShellCmd // free SShellCmd
void freeCommand(SWords* command) { void freeCommand(SWords* command) {
SWord* word = command->head; SWord* item = command->head;
if (word == NULL) {
return;
}
// loop // loop
while (word->next) { while (item) {
SWord* tmp = word; SWord* tmp = item;
word = word->next; item = item->next;
// if malloc need free // if malloc need free
if (tmp->free && tmp->word) taosMemoryFree(tmp->word); if (tmp->free && tmp->word) taosMemoryFree(tmp->word);
taosMemoryFree(tmp); taosMemoryFree(tmp);
} }
// if malloc need free
if (word->free && word->word) taosMemoryFree(word->word);
taosMemoryFree(word);
} }
void GenerateVarType(int type, char** p, int count) { void GenerateVarType(int type, char** p, int count) {
...@@ -1204,11 +1196,11 @@ bool nextMatchCommand(TAOS* con, SShellCmd* cmd, SWords* firstMatch) { ...@@ -1204,11 +1196,11 @@ bool nextMatchCommand(TAOS* con, SShellCmd* cmd, SWords* firstMatch) {
#endif #endif
// free // free
freeCommand(input);
if (input->source) { if (input->source) {
taosMemoryFree(input->source); taosMemoryFree(input->source);
input->source = NULL; input->source = NULL;
} }
freeCommand(input);
taosMemoryFree(input); taosMemoryFree(input);
return true; return true;
...@@ -1377,7 +1369,7 @@ bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* sql, int32_t len) { ...@@ -1377,7 +1369,7 @@ bool appendAfterSelect(TAOS* con, SShellCmd* cmd, char* sql, int32_t len) {
bool ret = false; bool ret = false;
if (from == NULL) { if (from == NULL) {
bool fieldEnd = fieldsInputEnd(p); bool fieldEnd = fieldsInputEnd(p);
// cheeck fields input end then insert from keyword // check fields input end then insert from keyword
if (fieldEnd && p[len - 1] == ' ') { if (fieldEnd && p[len - 1] == ' ') {
shellInsertChar(cmd, "from", 4); shellInsertChar(cmd, "from", 4);
taosMemoryFree(p); taosMemoryFree(p);
......
...@@ -101,8 +101,13 @@ void shellInsertChar(SShellCmd *cmd, char *c, int32_t size) { ...@@ -101,8 +101,13 @@ void shellInsertChar(SShellCmd *cmd, char *c, int32_t size) {
/* update the values */ /* update the values */
cmd->commandSize += size; cmd->commandSize += size;
cmd->cursorOffset += size; cmd->cursorOffset += size;
for (int i = 0; i < size; i++) {
taosMbToWchar(&wc, c + i, size);
cmd->screenOffset += taosWcharWidth(wc); cmd->screenOffset += taosWcharWidth(wc);
cmd->endOffset += taosWcharWidth(wc); cmd->endOffset += taosWcharWidth(wc);
}
// set string end
cmd->command[cmd->commandSize] = 0;
#ifdef WINDOWS #ifdef WINDOWS
#else #else
shellShowOnScreen(cmd); shellShowOnScreen(cmd);
...@@ -123,6 +128,8 @@ void shellBackspaceChar(SShellCmd *cmd) { ...@@ -123,6 +128,8 @@ void shellBackspaceChar(SShellCmd *cmd) {
cmd->cursorOffset -= size; cmd->cursorOffset -= size;
cmd->screenOffset -= width; cmd->screenOffset -= width;
cmd->endOffset -= width; cmd->endOffset -= width;
// set string end
cmd->command[cmd->commandSize] = 0;
shellShowOnScreen(cmd); shellShowOnScreen(cmd);
} }
} }
...@@ -136,6 +143,8 @@ void shellClearLineBefore(SShellCmd *cmd) { ...@@ -136,6 +143,8 @@ void shellClearLineBefore(SShellCmd *cmd) {
cmd->cursorOffset = 0; cmd->cursorOffset = 0;
cmd->screenOffset = 0; cmd->screenOffset = 0;
cmd->endOffset = cmd->commandSize; cmd->endOffset = cmd->commandSize;
// set string end
cmd->command[cmd->commandSize] = 0;
shellShowOnScreen(cmd); shellShowOnScreen(cmd);
} }
...@@ -160,6 +169,8 @@ void shellDeleteChar(SShellCmd *cmd) { ...@@ -160,6 +169,8 @@ void shellDeleteChar(SShellCmd *cmd) {
cmd->commandSize - cmd->cursorOffset - size); cmd->commandSize - cmd->cursorOffset - size);
cmd->commandSize -= size; cmd->commandSize -= size;
cmd->endOffset -= width; cmd->endOffset -= width;
// set string end
cmd->command[cmd->commandSize] = 0;
shellShowOnScreen(cmd); shellShowOnScreen(cmd);
} }
} }
......
...@@ -309,27 +309,24 @@ void matchPrefixFromTree(STire* tire, char* prefix, SMatch* match) { ...@@ -309,27 +309,24 @@ void matchPrefixFromTree(STire* tire, char* prefix, SMatch* match) {
} }
SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match) { SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match) {
if (match == NULL) { SMatch* rMatch = match; // define return match
match = (SMatch*)taosMemoryMalloc(sizeof(SMatch)); if (rMatch == NULL) {
memset(match, 0, sizeof(SMatch)); rMatch = (SMatch*)taosMemoryMalloc(sizeof(SMatch));
memset(rMatch, 0, sizeof(SMatch));
} }
switch (tire->type) { switch (tire->type) {
case TIRE_TREE: case TIRE_TREE:
matchPrefixFromTree(tire, prefix, match); matchPrefixFromTree(tire, prefix, rMatch);
break;
case TIRE_LIST: case TIRE_LIST:
matchPrefixFromList(tire, prefix, match); matchPrefixFromList(tire, prefix, rMatch);
break;
default: default:
break; break;
} }
// return if need return rMatch;
if (match->count == 0) {
freeMatch(match);
match = NULL;
}
return match;
} }
// get all items from tires tree // get all items from tires tree
...@@ -378,8 +375,10 @@ SMatch* enumAll(STire* tire) { ...@@ -378,8 +375,10 @@ SMatch* enumAll(STire* tire) {
switch (tire->type) { switch (tire->type) {
case TIRE_TREE: case TIRE_TREE:
enumFromTree(tire, match); enumFromTree(tire, match);
break;
case TIRE_LIST: case TIRE_LIST:
enumFromList(tire, match); enumFromList(tire, match);
break;
default: default:
break; break;
} }
......
...@@ -19,20 +19,20 @@ ...@@ -19,20 +19,20 @@
#include "os.h" #include "os.h"
#include "cJSON.h" #include "cJSON.h"
#include "tconfig.h"
#include "taos.h" #include "taos.h"
#include "taoserror.h" #include "taoserror.h"
#include "tconfig.h"
#include "tglobal.h"
#include "tidpool.h" #include "tidpool.h"
#include "tlog.h" #include "tlog.h"
#include "ttimer.h" #include "ttimer.h"
#include "ttypes.h" #include "ttypes.h"
#include "tutil.h" #include "tutil.h"
#include "tglobal.h"
#define MAX_MAIN_SCRIPT_NUM 10 #define MAX_MAIN_SCRIPT_NUM 10
#define MAX_BACKGROUND_SCRIPT_NUM 10 #define MAX_BACKGROUND_SCRIPT_NUM 10
#define MAX_FILE_NAME_LEN 256 #define MAX_FILE_NAME_LEN 256
#define MAX_ERROR_LEN 1024 #define MAX_ERROR_LEN 4096
#define MAX_QUERY_VALUE_LEN 1024 #define MAX_QUERY_VALUE_LEN 1024
#define MAX_QUERY_COL_NUM 100 #define MAX_QUERY_COL_NUM 100
#define MAX_QUERY_ROW_NUM 100 #define MAX_QUERY_ROW_NUM 100
...@@ -55,12 +55,42 @@ ...@@ -55,12 +55,42 @@
#define FAILED_POSTFIX "" #define FAILED_POSTFIX ""
#endif #endif
#define simFatal(...) { if (simDebugFlag & DEBUG_FATAL) { taosPrintLog("SIM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} #define simFatal(...) \
#define simError(...) { if (simDebugFlag & DEBUG_ERROR) { taosPrintLog("SIM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} { \
#define simWarn(...) { if (simDebugFlag & DEBUG_WARN) { taosPrintLog("SIM WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} if (simDebugFlag & DEBUG_FATAL) { \
#define simInfo(...) { if (simDebugFlag & DEBUG_INFO) { taosPrintLog("SIM ", DEBUG_INFO, 255, __VA_ARGS__); }} taosPrintLog("SIM FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
#define simDebug(...) { if (simDebugFlag & DEBUG_DEBUG) { taosPrintLog("SIM ", DEBUG_DEBUG, simDebugFlag, __VA_ARGS__); }} } \
#define simTrace(...) { if (simDebugFlag & DEBUG_TRACE) { taosPrintLog("SIM ", DEBUG_TRACE, simDebugFlag, __VA_ARGS__); }} }
#define simError(...) \
{ \
if (simDebugFlag & DEBUG_ERROR) { \
taosPrintLog("SIM ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
}
#define simWarn(...) \
{ \
if (simDebugFlag & DEBUG_WARN) { \
taosPrintLog("SIM WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define simInfo(...) \
{ \
if (simDebugFlag & DEBUG_INFO) { \
taosPrintLog("SIM ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define simDebug(...) \
{ \
if (simDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("SIM ", DEBUG_DEBUG, simDebugFlag, __VA_ARGS__); \
} \
}
#define simTrace(...) \
{ \
if (simDebugFlag & DEBUG_TRACE) { \
taosPrintLog("SIM ", DEBUG_TRACE, simDebugFlag, __VA_ARGS__); \
} \
}
enum { SIM_SCRIPT_TYPE_MAIN, SIM_SCRIPT_TYPE_BACKGROUND }; enum { SIM_SCRIPT_TYPE_MAIN, SIM_SCRIPT_TYPE_BACKGROUND };
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册