提交 7f755f60 编写于 作者: S Shengliang Guan

Merge remote-tracking branch 'origin/3.0' into feature/shm

...@@ -59,10 +59,7 @@ typedef struct SDataBlockInfo { ...@@ -59,10 +59,7 @@ typedef struct SDataBlockInfo {
int32_t rowSize; int32_t rowSize;
int16_t numOfCols; int16_t numOfCols;
int16_t hasVarCol; int16_t hasVarCol;
union { union {int64_t uid; int64_t blockId;};
int64_t uid;
int64_t blockId;
};
int64_t groupId; // no need to serialize int64_t groupId; // no need to serialize
} SDataBlockInfo; } SDataBlockInfo;
...@@ -96,6 +93,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); ...@@ -96,6 +93,7 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
int32_t tEncodeDataBlocks(void** buf, const SArray* blocks); int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
void* tDecodeDataBlocks(const void* buf, SArray** blocks); void* tDecodeDataBlocks(const void* buf, SArray** blocks);
void colDataDestroy(SColumnInfoData* pColData) ;
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols, // WARNING: do not use info.numOfCols,
...@@ -103,13 +101,7 @@ static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { ...@@ -103,13 +101,7 @@ static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) { for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { colDataDestroy(pColInfoData);
taosMemoryFreeClear(pColInfoData->varmeta.offset);
} else {
taosMemoryFreeClear(pColInfoData->nullbitmap);
}
taosMemoryFreeClear(pColInfoData->pData);
} }
taosArrayDestroy(pBlock->pDataBlock); taosArrayDestroy(pBlock->pDataBlock);
......
...@@ -101,6 +101,54 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u ...@@ -101,6 +101,54 @@ static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, u
((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \ ((IS_VAR_DATA_TYPE((p1_)->info.type)) ? ((p1_)->pData + (p1_)->varmeta.offset[(r_)]) \
: ((p1_)->pData + ((r_) * (p1_)->info.bytes))) : ((p1_)->pData + ((r_) * (p1_)->info.bytes)))
static FORCE_INLINE void colDataAppendNULL(SColumnInfoData* pColumnInfoData, uint32_t currentRow) {
// There is a placehold for each NULL value of binary or nchar type.
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
pColumnInfoData->varmeta.offset[currentRow] = -1; // it is a null value of VAR type.
} else {
colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow);
}
pColumnInfoData->hasNull = true;
}
static FORCE_INLINE int32_t colDataAppendInt8(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int8_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_TINYINT ||
pColumnInfoData->info.type == TSDB_DATA_TYPE_UTINYINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_BOOL);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int8_t*)p = *(int8_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt16(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int16_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_SMALLINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_USMALLINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int16_t*)p = *(int16_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt32(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int32_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_INT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int32_t*)p = *(int32_t*)v;
}
static FORCE_INLINE int32_t colDataAppendInt64(SColumnInfoData* pColumnInfoData, uint32_t currentRow, int64_t* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_BIGINT || pColumnInfoData->info.type == TSDB_DATA_TYPE_UBIGINT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(int64_t*)p = *(int64_t*)v;
}
static FORCE_INLINE int32_t colDataAppendFloat(SColumnInfoData* pColumnInfoData, uint32_t currentRow, float* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_FLOAT);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(float*)p = *(float*)v;
}
static FORCE_INLINE int32_t colDataAppendDouble(SColumnInfoData* pColumnInfoData, uint32_t currentRow, double* v) {
ASSERT(pColumnInfoData->info.type == TSDB_DATA_TYPE_DOUBLE);
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
*(double*)p = *(double*)v;
}
int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull); int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, const char* pData, bool isNull);
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource,
uint32_t numOfRow2); uint32_t numOfRow2);
......
...@@ -192,7 +192,6 @@ enum { ...@@ -192,7 +192,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp) TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
......
...@@ -27,16 +27,22 @@ extern "C" { ...@@ -27,16 +27,22 @@ extern "C" {
struct SqlFunctionCtx; struct SqlFunctionCtx;
struct SResultRowEntryInfo; struct SResultRowEntryInfo;
typedef struct SFunctionNode SFunctionNode; struct SFunctionNode;
typedef struct SScalarParam SScalarParam;
typedef struct SFuncExecEnv { typedef struct SFuncExecEnv {
int32_t calcMemSize; int32_t calcMemSize;
} SFuncExecEnv; } SFuncExecEnv;
typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv); typedef bool (*FExecGetEnv)(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); typedef bool (*FExecInit)(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo);
typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx); typedef void (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx); typedef void (*FExecFinalize)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef struct SScalarFuncExecFuncs {
FScalarExecProcess process;
} SScalarFuncExecFuncs;
typedef struct SFuncExecFuncs { typedef struct SFuncExecFuncs {
FExecGetEnv getEnv; FExecGetEnv getEnv;
...@@ -191,6 +197,7 @@ typedef struct SqlFunctionCtx { ...@@ -191,6 +197,7 @@ typedef struct SqlFunctionCtx {
SPoint1 start; SPoint1 start;
SPoint1 end; SPoint1 end;
SFuncExecFuncs fpSet; SFuncExecFuncs fpSet;
SScalarFuncExecFuncs sfp;
} SqlFunctionCtx; } SqlFunctionCtx;
enum { enum {
...@@ -203,7 +210,7 @@ enum { ...@@ -203,7 +210,7 @@ enum {
}; };
typedef struct tExprNode { typedef struct tExprNode {
uint8_t nodeType; int32_t nodeType;
union { union {
struct { struct {
int32_t optr; // binary operator int32_t optr; // binary operator
...@@ -219,7 +226,7 @@ typedef struct tExprNode { ...@@ -219,7 +226,7 @@ typedef struct tExprNode {
char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor char functionName[FUNCTIONS_NAME_MAX_LENGTH]; // todo refactor
int32_t functionId; int32_t functionId;
int32_t num; int32_t num;
SFunctionNode *pFunctNode; struct SFunctionNode *pFunctNode;
// Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the // Note that the attribute of pChild is not the parameter of function, it is the columns that involved in the
// calculation instead. // calculation instead.
// E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes. // E.g., Cov(col1, col2), the column information, w.r.t. the col1 and col2, is kept in pChild nodes.
...@@ -227,6 +234,10 @@ typedef struct tExprNode { ...@@ -227,6 +234,10 @@ typedef struct tExprNode {
// operator and is kept in the attribute of _node. // operator and is kept in the attribute of _node.
struct tExprNode **pChild; struct tExprNode **pChild;
} _function; } _function;
struct {
struct SNode* pRootNode;
} _optrRoot;
}; };
} tExprNode; } tExprNode;
...@@ -250,25 +261,11 @@ typedef struct SAggFunctionInfo { ...@@ -250,25 +261,11 @@ typedef struct SAggFunctionInfo {
int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId); int32_t (*dataReqFunc)(SqlFunctionCtx *pCtx, STimeWindow* w, int32_t colId);
} SAggFunctionInfo; } SAggFunctionInfo;
typedef struct SScalarParam { struct SScalarParam {
void *data;
union {
SColumnInfoData *columnData; SColumnInfoData *columnData;
void *data; SHashObj *pHashFilter;
} orig; int32_t numOfRows;
char *bitmap; };
bool dataInBlock;
int32_t num;
int32_t type;
int32_t bytes;
} SScalarParam;
typedef struct SScalarFunctionInfo {
char name[FUNCTIONS_NAME_MAX_LENGTH];
int8_t type; // scalar function or aggregation function
uint32_t functionId; // index of scalar function
void (*process)(struct SScalarParam* pOutput, size_t numOfInput, const struct SScalarParam *pInput);
} SScalarFunctionInfo;
typedef struct SMultiFunctionsDesc { typedef struct SMultiFunctionsDesc {
bool stableQuery; bool stableQuery;
......
...@@ -103,13 +103,6 @@ struct SqlFunctionCtx; ...@@ -103,13 +103,6 @@ struct SqlFunctionCtx;
struct SResultRowEntryInfo; struct SResultRowEntryInfo;
struct STimeWindow; struct STimeWindow;
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef struct SScalarFuncExecFuncs {
FScalarExecProcess process;
} SScalarFuncExecFuncs;
int32_t fmFuncMgtInit(); int32_t fmFuncMgtInit();
void fmFuncMgtDestroy(); void fmFuncMgtDestroy();
......
...@@ -40,7 +40,7 @@ int32_t scalarGetOperatorParamNum(EOperatorType type); ...@@ -40,7 +40,7 @@ int32_t scalarGetOperatorParamNum(EOperatorType type);
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type); int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type);
int32_t vectorGetConvertType(int32_t type1, int32_t type2); int32_t vectorGetConvertType(int32_t type1, int32_t type2);
int32_t vectorConvertImpl(SScalarParam* pIn, SScalarParam* pOut); int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut);
int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t absFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput); int32_t logFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
......
...@@ -82,8 +82,12 @@ typedef struct { ...@@ -82,8 +82,12 @@ typedef struct {
SHashObj* pHash; // groupId to tbuid SHashObj* pHash; // groupId to tbuid
} STaskSinkTb; } STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
typedef struct { typedef struct {
int8_t reserved; int64_t smaId;
// following are not applicable to encoder and decoder
FSmaHandle* smaHandle;
} STaskSinkSma; } STaskSinkSma;
typedef struct { typedef struct {
...@@ -156,7 +160,8 @@ typedef struct { ...@@ -156,7 +160,8 @@ typedef struct {
STaskDispatcherShuffle shuffleDispatcher; STaskDispatcherShuffle shuffleDispatcher;
}; };
// state storage // application storage
void* ahandle;
} SStreamTask; } SStreamTask;
......
...@@ -235,7 +235,7 @@ void initMsgHandleFp(); ...@@ -235,7 +235,7 @@ void initMsgHandleFp();
TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db, TAOS* taos_connect_internal(const char* ip, const char* user, const char* pass, const char* auth, const char* db,
uint16_t port); uint16_t port);
void* doFetchRow(SRequestObj* pRequest); void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr);
int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows); int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32_t numOfCols, int32_t numOfRows);
......
...@@ -545,7 +545,33 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c ...@@ -545,7 +545,33 @@ TAOS* taos_connect_l(const char* ip, int ipLen, const char* user, int userLen, c
return taos_connect(ipStr, userStr, passStr, dbStr, port); return taos_connect(ipStr, userStr, passStr, dbStr, port);
} }
void* doFetchRow(SRequestObj* pRequest) { static void doSetOneRowPtr(SReqResultInfo* pResultInfo) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) {
SResultColumn* pCol = &pResultInfo->pCol[i];
int32_t type = pResultInfo->fields[i].type;
int32_t bytes = pResultInfo->fields[i].bytes;
if (IS_VAR_DATA_TYPE(type)) {
if (pCol->offset[pResultInfo->current] != -1) {
char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
pResultInfo->length[i] = varDataLen(pStart);
pResultInfo->row[i] = varDataVal(pStart);
} else {
pResultInfo->row[i] = NULL;
}
} else {
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
} else {
pResultInfo->row[i] = NULL;
}
}
}
}
void* doFetchRow(SRequestObj* pRequest, bool setupOneRowPtr) {
assert(pRequest != NULL); assert(pRequest != NULL);
SReqResultInfo* pResultInfo = &pRequest->body.resInfo; SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
...@@ -555,17 +581,20 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -555,17 +581,20 @@ void* doFetchRow(SRequestObj* pRequest) {
if (pRequest->type == TDMT_VND_QUERY) { if (pRequest->type == TDMT_VND_QUERY) {
// All data has returned to App already, no need to try again // All data has returned to App already, no need to try again
if (pResultInfo->completed) { if (pResultInfo->completed) {
pResultInfo->numOfRows = 0;
return NULL; return NULL;
} }
SReqResultInfo* pResInfo = &pRequest->body.resInfo; SReqResultInfo* pResInfo = &pRequest->body.resInfo;
pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData); pRequest->code = schedulerFetchRows(pRequest->body.queryJob, (void**)&pResInfo->pData);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL; return NULL;
} }
pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData); pRequest->code = setQueryResultFromRsp(&pRequest->body.resInfo, (SRetrieveTableRsp*)pResInfo->pData);
if (pRequest->code != TSDB_CODE_SUCCESS) { if (pRequest->code != TSDB_CODE_SUCCESS) {
pResultInfo->numOfRows = 0;
return NULL; return NULL;
} }
...@@ -633,41 +662,11 @@ void* doFetchRow(SRequestObj* pRequest) { ...@@ -633,41 +662,11 @@ void* doFetchRow(SRequestObj* pRequest) {
} }
_return: _return:
if (setupOneRowPtr) {
for (int32_t i = 0; i < pResultInfo->numOfCols; ++i) { doSetOneRowPtr(pResultInfo);
SResultColumn* pCol = &pResultInfo->pCol[i]; pResultInfo->current += 1;
int32_t type = pResultInfo->fields[i].type;
int32_t bytes = pResultInfo->fields[i].bytes;
if (IS_VAR_DATA_TYPE(type)) {
if (pCol->offset[pResultInfo->current] != -1) {
char* pStart = pResultInfo->pCol[i].offset[pResultInfo->current] + pResultInfo->pCol[i].pData;
pResultInfo->length[i] = varDataLen(pStart);
pResultInfo->row[i] = varDataVal(pStart);
if (type == TSDB_DATA_TYPE_NCHAR) {
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(pResultInfo->convertBuf[i]));
ASSERT(len <= bytes);
pResultInfo->row[i] = varDataVal(pResultInfo->convertBuf[i]);
varDataSetLen(pResultInfo->convertBuf[i], len);
pResultInfo->length[i] = len;
}
} else {
pResultInfo->row[i] = NULL;
}
} else {
if (!colDataIsNull_f(pCol->nullbitmap, pResultInfo->current)) {
pResultInfo->row[i] = pResultInfo->pCol[i].pData + bytes * pResultInfo->current;
} else {
pResultInfo->row[i] = NULL;
}
}
} }
pResultInfo->current += 1;
return pResultInfo->row; return pResultInfo->row;
} }
...@@ -681,12 +680,6 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) { ...@@ -681,12 +680,6 @@ static int32_t doPrepareResPtr(SReqResultInfo* pResInfo) {
if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) { if (pResInfo->row == NULL || pResInfo->pCol == NULL || pResInfo->length == NULL || pResInfo->convertBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
for(int32_t i = 0; i < pResInfo->numOfCols; ++i) {
if(pResInfo->fields[i].type == TSDB_DATA_TYPE_NCHAR) {
pResInfo->convertBuf[i] = taosMemoryCalloc(1, NCHAR_WIDTH_TO_BYTES(pResInfo->fields[i].bytes));
}
}
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -723,6 +716,35 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32 ...@@ -723,6 +716,35 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
pStart += colLength[i]; pStart += colLength[i];
} }
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t type = pResultInfo->fields[i].type;
int32_t bytes = pResultInfo->fields[i].bytes;
if (type == TSDB_DATA_TYPE_NCHAR) {
char* p = taosMemoryRealloc(pResultInfo->convertBuf[i], colLength[i]);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pResultInfo->convertBuf[i] = p;
SResultColumn* pCol = &pResultInfo->pCol[i];
for (int32_t j = 0; j < numOfRows; ++j) {
if (pCol->offset[j] != -1) {
pStart = pCol->offset[j] + pCol->pData;
int32_t len = taosUcs4ToMbs((TdUcs4*)varDataVal(pStart), varDataLen(pStart), varDataVal(p));
ASSERT(len <= bytes);
varDataSetLen(p, len);
pCol->offset[j] = (p - pResultInfo->convertBuf[i]);
p += (len + VARSTR_HEADER_SIZE);
}
}
pResultInfo->pCol[i].pData = pResultInfo->convertBuf[i];
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -138,20 +138,20 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) { ...@@ -138,20 +138,20 @@ TAOS_RES *taos_query(TAOS *taos, const char *sql) {
return taos_query_l(taos, sql, (int32_t) strlen(sql)); return taos_query_l(taos, sql, (int32_t) strlen(sql));
} }
TAOS_ROW taos_fetch_row(TAOS_RES *pRes) { TAOS_ROW taos_fetch_row(TAOS_RES *res) {
if (pRes == NULL) { if (res == NULL) {
return NULL; return NULL;
} }
SRequestObj *pRequest = (SRequestObj *) pRes; SRequestObj *pRequest = (SRequestObj *) res;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT || if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pRequest->type == TSDB_SQL_INSERT || pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS || pRequest->code != TSDB_CODE_SUCCESS ||
taos_num_fields(pRes) == 0) { taos_num_fields(res) == 0) {
return NULL; return NULL;
} }
return doFetchRow(pRequest); return doFetchRow(pRequest, true);
} }
int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) { int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields) {
...@@ -246,6 +246,7 @@ int* taos_fetch_lengths(TAOS_RES *res) { ...@@ -246,6 +246,7 @@ int* taos_fetch_lengths(TAOS_RES *res) {
return ((SRequestObj*) res)->body.resInfo.length; return ((SRequestObj*) res)->body.resInfo.length;
} }
// todo intergrate with tDataTypes
const char *taos_data_type(int type) { const char *taos_data_type(int type) {
switch (type) { switch (type) {
case TSDB_DATA_TYPE_NULL: return "TSDB_DATA_TYPE_NULL"; case TSDB_DATA_TYPE_NULL: return "TSDB_DATA_TYPE_NULL";
...@@ -256,9 +257,11 @@ const char *taos_data_type(int type) { ...@@ -256,9 +257,11 @@ const char *taos_data_type(int type) {
case TSDB_DATA_TYPE_BIGINT: return "TSDB_DATA_TYPE_BIGINT"; case TSDB_DATA_TYPE_BIGINT: return "TSDB_DATA_TYPE_BIGINT";
case TSDB_DATA_TYPE_FLOAT: return "TSDB_DATA_TYPE_FLOAT"; case TSDB_DATA_TYPE_FLOAT: return "TSDB_DATA_TYPE_FLOAT";
case TSDB_DATA_TYPE_DOUBLE: return "TSDB_DATA_TYPE_DOUBLE"; case TSDB_DATA_TYPE_DOUBLE: return "TSDB_DATA_TYPE_DOUBLE";
case TSDB_DATA_TYPE_BINARY: return "TSDB_DATA_TYPE_BINARY"; case TSDB_DATA_TYPE_VARCHAR: return "TSDB_DATA_TYPE_VARCHAR";
// case TSDB_DATA_TYPE_BINARY: return "TSDB_DATA_TYPE_VARCHAR";
case TSDB_DATA_TYPE_TIMESTAMP: return "TSDB_DATA_TYPE_TIMESTAMP"; case TSDB_DATA_TYPE_TIMESTAMP: return "TSDB_DATA_TYPE_TIMESTAMP";
case TSDB_DATA_TYPE_NCHAR: return "TSDB_DATA_TYPE_NCHAR"; case TSDB_DATA_TYPE_NCHAR: return "TSDB_DATA_TYPE_NCHAR";
case TSDB_DATA_TYPE_JSON: return "TSDB_DATA_TYPE_JSON";
default: return "UNKNOWN"; default: return "UNKNOWN";
} }
} }
...@@ -316,11 +319,37 @@ void taos_stop_query(TAOS_RES *res) { ...@@ -316,11 +319,37 @@ void taos_stop_query(TAOS_RES *res) {
} }
bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) { bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col) {
return false; SRequestObj* pRequestObj = res;
SReqResultInfo* pResultInfo = &pRequestObj->body.resInfo;
if (col >= pResultInfo->numOfCols || col < 0 || row >= pResultInfo->numOfRows || row < 0) {
return true;
}
SResultColumn* pCol = &pRequestObj->body.resInfo.pCol[col];
return colDataIsNull_f(pCol->nullbitmap, row);
} }
int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) { int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows) {
if (res == NULL) {
return 0; return 0;
}
SRequestObj *pRequest = (SRequestObj *) res;
if (pRequest->type == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
pRequest->type == TSDB_SQL_INSERT ||
pRequest->code != TSDB_CODE_SUCCESS ||
taos_num_fields(res) == 0) {
return 0;
}
doFetchRow(pRequest, false);
// TODO refactor
SReqResultInfo* pResultInfo = &pRequest->body.resInfo;
pResultInfo->current = pResultInfo->numOfRows;
*rows = pResultInfo->row;
return pResultInfo->numOfRows;
} }
int taos_validate_sql(TAOS *taos, const char *sql) { int taos_validate_sql(TAOS *taos, const char *sql) {
......
...@@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL: ...@@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL:
} }
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
printf("call update ep %d\n", epoch); /*printf("call update ep %d\n", epoch);*/
bool set = false; bool set = false;
int32_t sz = taosArrayGetSize(pRsp->topics); int32_t sz = taosArrayGetSize(pRsp->topics);
SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
......
...@@ -1241,6 +1241,16 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) { ...@@ -1241,6 +1241,16 @@ size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize) {
return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock)); return pageSize / (blockDataGetSerialRowSize(pBlock) + blockDataGetSerialMetaSize(pBlock));
} }
void colDataDestroy(SColumnInfoData* pColData) {
if (IS_VAR_DATA_TYPE(pColData->info.type)) {
taosMemoryFree(pColData->varmeta.offset);
} else {
taosMemoryFree(pColData->nullbitmap);
}
taosMemoryFree(pColData->pData);
}
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
int64_t tbUid = pBlock->info.uid; int64_t tbUid = pBlock->info.uid;
int16_t numOfCols = pBlock->info.numOfCols; int16_t numOfCols = pBlock->info.numOfCols;
......
...@@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) { ...@@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, vmProcessMergeMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, VND_VGID); dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, vmProcessFetchMsg, VND_VGID);
......
...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); ...@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); ...@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) { ...@@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, sz) < 0) return -1; if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGet(pObj->tasks, i); SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray); int32_t innerSz = taosArrayGetSize(pArray);
if (tEncodeI32(pEncoder, innerSz) < 0) return -1; if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGet(pArray, j); SStreamTask *pTask = taosArrayGetP(pArray, j);
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1; if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
} }
} }
...@@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) { ...@@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (sz != 0) { if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(SArray)); pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t innerSz; int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1; if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask)); SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
for (int32_t j = 0; j < innerSz; j++) { for (int32_t j = 0; j < innerSz; j++) {
SStreamTask task; SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1; if (pTask == NULL) return -1;
taosArrayPush(pArray, &task); if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
taosArrayPush(pArray, &pTask);
} }
taosArrayPush(pObj->tasks, pArray); taosArrayPush(pObj->tasks, &pArray);
} }
} }
......
...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { ...@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup; return pVgroup;
} }
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
SSdb* pSdb = pMnode->pSdb; SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) { if (pPlan == NULL) {
...@@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// only for inplace // only for inplace
pTask->sinkType = TASK_SINK__SHOW; pTask->sinkType = TASK_SINK__SHOW;
pTask->showSink.reserved = 0; pTask->showSink.reserved = 0;
if (smaId != -1) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId;
}
} else { } else {
pTask->sinkType = TASK_SINK__NONE; pTask->sinkType = TASK_SINK__NONE;
} }
......
...@@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {} ...@@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {}
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; int32_t size =
sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size); SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER; if (pRaw == NULL) goto _OVER;
...@@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre ...@@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
......
...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { ...@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return code; return code;
} }
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) {
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast ...@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1; return -1;
} }
if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) {
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
return -1; return -1;
} }
...@@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe ...@@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
} }
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans); mndTransDrop(pTrans);
return -1; return -1;
......
...@@ -19,15 +19,14 @@ ...@@ -19,15 +19,14 @@
#include "tmallocator.h" #include "tmallocator.h"
// #include "sync.h" // #include "sync.h"
#include "tcoding.h" #include "tcoding.h"
#include "tdatablock.h"
#include "tfs.h" #include "tfs.h"
#include "tlist.h" #include "tlist.h"
#include "tlockfree.h" #include "tlockfree.h"
#include "tmacro.h" #include "tmacro.h"
#include "wal.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeQuery.h" #include "vnodeQuery.h"
#include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -198,10 +197,13 @@ int tqCommit(STQ*); ...@@ -198,10 +197,13 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen);
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -473,10 +473,17 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -473,10 +473,17 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
} }
tCoderClear(&decoder); tCoderClear(&decoder);
// exec
if (tqExpandTask(pTq, pTask, 4) < 0) { if (tqExpandTask(pTq, pTask, 4) < 0) {
ASSERT(0); ASSERT(0);
} }
// sink
pTask->ahandle = pTq->pVnode;
if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle = smaHandleRes;
}
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
return 0; return 0;
...@@ -497,11 +504,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { ...@@ -497,11 +504,9 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
return 0; return 0;
} }
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead));
SStreamTaskExecReq req; SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msgstr, &req); tDecodeSStreamTaskExecReq(msg, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
......
...@@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in fetch queue is processing"); vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
...@@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -65,10 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg); return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC: case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg); return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen);
case TDMT_VND_STREAM_TRIGGER: case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
case TDMT_VND_QUERY_HEARTBEAT: case TDMT_VND_QUERY_HEARTBEAT:
......
...@@ -15,6 +15,11 @@ ...@@ -15,6 +15,11 @@
#include "vnd.h" #include "vnd.h"
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
// TODO
blockDebugShowData(data);
}
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) { void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
SNodeMsg *pMsg; SNodeMsg *pMsg;
SRpcMsg *pRpc; SRpcMsg *pRpc;
...@@ -178,6 +183,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -178,6 +183,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pMsg->contLen - sizeof(SMsgHead)) < 0) { pMsg->contLen - sizeof(SMsgHead)) < 0) {
} }
} break; } break;
case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA case TDMT_VND_CREATE_SMA: { // timeRangeSMA
#if 0 #if 0
SSmaCfg vCreateSmaReq = {0}; SSmaCfg vCreateSmaReq = {0};
......
add_subdirectory(transport) add_subdirectory(transport)
add_subdirectory(sync) add_subdirectory(sync)
# add_subdirectory(tdb) add_subdirectory(tdb)
add_subdirectory(index) add_subdirectory(index)
add_subdirectory(wal) add_subdirectory(wal)
add_subdirectory(parser) add_subdirectory(parser)
......
...@@ -469,7 +469,7 @@ typedef struct SOptrBasicInfo { ...@@ -469,7 +469,7 @@ typedef struct SOptrBasicInfo {
int32_t* rowCellInfoOffset; // offset value for each row result cell info int32_t* rowCellInfoOffset; // offset value for each row result cell info
SqlFunctionCtx* pCtx; SqlFunctionCtx* pCtx;
SSDataBlock* pRes; SSDataBlock* pRes;
int32_t capacity; int32_t capacity; // TODO remove it
} SOptrBasicInfo; } SOptrBasicInfo;
//TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset //TODO move the resultrowsiz together with SOptrBasicInfo:rowCellInfoOffset
......
...@@ -1246,17 +1246,40 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction ...@@ -1246,17 +1246,40 @@ static void doAggregateImpl(SOperatorInfo* pOperator, TSKEY startTs, SqlFunction
} }
} }
static void projectApplyFunctions(SSDataBlock* pResult, SqlFunctionCtx *pCtx, int32_t numOfOutput) { static void projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx *pCtx, int32_t numOfOutput) {
for (int32_t k = 0; k < numOfOutput; ++k) { for (int32_t k = 0; k < numOfOutput; ++k) {
if (pCtx[k].fpSet.init == NULL) { // it is a project query if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, k);
colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows); colDataAssign(pColInfoData, pCtx[k].input.pData[0], pCtx[k].input.numOfRows);
} else { // TODO: arithmetic and other process.
pResult->info.rows = pCtx[0].input.numOfRows;
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_OPERATOR) {
SArray* pBlockList = taosArrayInit(4, POINTER_BYTES);
taosArrayPush(pBlockList, &pSrcBlock);
SScalarParam dest = {0};
dest.columnData = taosArrayGet(pResult->pDataBlock, k);
scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
pResult->info.rows = dest.numOfRows;
taosArrayDestroy(pBlockList);
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_FUNCTION) {
ASSERT(!fmIsAggFunc(pCtx->functionId));
SScalarParam p = {.numOfRows = pSrcBlock->info.rows};
int32_t slotId = pExpr[k].base.pParam[0].pCol->slotId;
p.columnData = taosArrayGet(pSrcBlock->pDataBlock, slotId);
SScalarParam dest = {0};
dest.columnData = taosArrayGet(pResult->pDataBlock, k);
pCtx[k].sfp.process(&p, 1, &dest);
pResult->info.rows = dest.numOfRows;
} else {
ASSERT(0); ASSERT(0);
} }
} }
pResult->info.rows = pCtx[0].input.numOfRows;
} }
void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs, void doTimeWindowInterpolation(SOperatorInfo* pOperator, SOptrBasicInfo* pInfo, SArray* pDataBlock, TSKEY prevTs,
...@@ -2013,107 +2036,6 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx *pCtx, int32_t numOfOutput) { ...@@ -2013,107 +2036,6 @@ static int32_t setCtxTagColumnInfo(SqlFunctionCtx *pCtx, int32_t numOfOutput) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static SqlFunctionCtx* createSqlFunctionCtx(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
int32_t** rowCellInfoOffset) {
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pFuncCtx == NULL) {
return NULL;
}
*rowCellInfoOffset = taosMemoryCalloc(numOfOutput, sizeof(int32_t));
if (*rowCellInfoOffset == 0) {
taosMemoryFreeClear(pFuncCtx);
return NULL;
}
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprBasicInfo *pFunct = &pExpr[i].base;
SqlFunctionCtx* pCtx = &pFuncCtx[i];
#if 0
SColIndex *pIndex = &pFunct->colInfo;
if (TSDB_COL_REQ_NULL(pIndex->flag)) {
pCtx->requireNull = true;
pIndex->flag &= ~(TSDB_COL_NULL);
} else {
pCtx->requireNull = false;
}
#endif
// pCtx->inputBytes = pFunct->colBytes;
// pCtx->inputType = pFunct->colType;
pCtx->ptsOutputBuf = NULL;
pCtx->resDataInfo.bytes = pFunct->resSchema.bytes;
pCtx->resDataInfo.type = pFunct->resSchema.type;
pCtx->order = pQueryAttr->order.order;
// pCtx->functionId = pFunct->functionId;
pCtx->stableQuery = pQueryAttr->stableQuery;
// pCtx->resDataInfo.interBufSize = pFunct->interBytes;
pCtx->start.key = INT64_MIN;
pCtx->end.key = INT64_MIN;
pCtx->numOfParams = pFunct->numOfParams;
for (int32_t j = 0; j < pCtx->numOfParams; ++j) {
int16_t type = pFunct->pParam[j].param.nType;
int16_t bytes = pFunct->pParam[j].param.nType;
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
// taosVariantCreateFromBinary(&pCtx->param[j], pFunct->param[j].pz, bytes, type);
} else {
// taosVariantCreateFromBinary(&pCtx->param[j], (char *)&pFunct->param[j].i, bytes, type);
}
}
// set the order information for top/bottom query
int32_t functionId = pCtx->functionId;
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
int32_t f = getExprFunctionId(&pExpr[0]);
assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY);
pCtx->param[2].i = pQueryAttr->order.order;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[3].i = functionId;
pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[1].i = pQueryAttr->order.col.colId;
} else if (functionId == FUNCTION_INTERP) {
pCtx->param[2].i = (int8_t)pQueryAttr->fillType;
if (pQueryAttr->fillVal != NULL) {
if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
} else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value
if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], pCtx->inputBytes, pCtx->inputType);
}
}
}
} else if (functionId == FUNCTION_TS_COMP) {
pCtx->param[0].i = pQueryAttr->vgId; //TODO this should be the parameter from client
pCtx->param[0].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_TWA) {
pCtx->param[1].i = pQueryAttr->window.skey;
pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT;
pCtx->param[2].i = pQueryAttr->window.ekey;
pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT;
} else if (functionId == FUNCTION_ARITHM) {
// pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i);
}
}
// for(int32_t i = 1; i < numOfOutput; ++i) {
// (*rowCellInfoOffset)[i] = (int32_t)((*rowCellInfoOffset)[i - 1] + sizeof(SResultRowEntryInfo) + pExpr[i - 1].base.interBytes);
// }
setCtxTagColumnInfo(pFuncCtx, numOfOutput);
return pFuncCtx;
}
static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) { static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset) {
SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx)); SqlFunctionCtx * pFuncCtx = (SqlFunctionCtx *)taosMemoryCalloc(numOfOutput, sizeof(SqlFunctionCtx));
if (pFuncCtx == NULL) { if (pFuncCtx == NULL) {
...@@ -2132,15 +2054,22 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num ...@@ -2132,15 +2054,22 @@ static SqlFunctionCtx* createSqlFunctionCtx_rv(SExprInfo* pExprInfo, int32_t num
SExprBasicInfo *pFunct = &pExpr->base; SExprBasicInfo *pFunct = &pExpr->base;
SqlFunctionCtx* pCtx = &pFuncCtx[i]; SqlFunctionCtx* pCtx = &pFuncCtx[i];
if (pExpr->pExpr->_function.pFunctNode != NULL) { pCtx->functionId = -1;
if (pExpr->pExpr->nodeType == QUERY_NODE_FUNCTION) {
SFuncExecEnv env = {0}; SFuncExecEnv env = {0};
pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId; pCtx->functionId = pExpr->pExpr->_function.pFunctNode->funcId;
if (fmIsAggFunc(pCtx->functionId)) {
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet); fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env); pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);
pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else { } else {
pCtx->functionId = -1; fmGetScalarFuncExecFuncs(pCtx->functionId, &pCtx->sfp);
}
pCtx->resDataInfo.interBufSize = env.calcMemSize;
} else if (pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) {
} else if (pExpr->pExpr->nodeType == QUERY_NODE_OPERATOR) {
} }
pCtx->input.numOfInputCols = pFunct->numOfParams; pCtx->input.numOfInputCols = pFunct->numOfParams;
...@@ -6654,7 +6583,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6654,7 +6583,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
blockDataCleanup(pRes); blockDataCleanup(pRes);
if (pProjectInfo->existDataBlock) { // TODO refactor if (pProjectInfo->existDataBlock) { // TODO refactor
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
SSDataBlock* pBlock = pProjectInfo->existDataBlock; SSDataBlock* pBlock = pProjectInfo->existDataBlock;
pProjectInfo->existDataBlock = NULL; pProjectInfo->existDataBlock = NULL;
*newgroup = true; *newgroup = true;
...@@ -6668,9 +6596,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6668,9 +6596,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
pRes->info.rows = getNumOfResult(pInfo->pCtx, pOperator->numOfOutput, NULL);
if (pRes->info.rows >= pProjectInfo->binfo.capacity*0.8) { if (pRes->info.rows >= pProjectInfo->binfo.capacity*0.8) {
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput); resetResultRowEntryResult(pInfo->pCtx, pOperator->numOfOutput);
...@@ -6713,15 +6639,15 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) ...@@ -6713,15 +6639,15 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
// the pDataBlock are always the same one, no need to call this again // the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC); setInputDataBlock(pOperator, pInfo->pCtx, pBlock, TSDB_ORDER_ASC);
updateOutputBuf(pInfo, &pInfo->capacity, pBlock->info.rows); blockDataEnsureCapacity(pInfo->pRes, pInfo->pRes->info.rows + pBlock->info.rows);
projectApplyFunctions(pInfo->pRes, pInfo->pCtx, pOperator->numOfOutput); projectApplyFunctions(pOperator->pExpr, pInfo->pRes, pBlock, pInfo->pCtx, pOperator->numOfOutput);
if (pRes->info.rows >= pProjectInfo->threshold) { if (pRes->info.rows >= pOperator->resultInfo.threshold) {
break; break;
} }
} }
copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput); // copyTsColoum(pRes, pInfo->pCtx, pOperator->numOfOutput);
return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL; return (pInfo->pRes->info.rows > 0)? pInfo->pRes:NULL;
} }
...@@ -7811,7 +7737,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* ...@@ -7811,7 +7737,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
...@@ -7836,7 +7762,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper ...@@ -7836,7 +7762,7 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper
SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1; pInfo->colIndex = -1;
pInfo->reptScan = false; pInfo->reptScan = false;
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
...@@ -7901,7 +7827,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo ...@@ -7901,7 +7827,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
...@@ -7925,7 +7851,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim ...@@ -7925,7 +7851,7 @@ SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntim
SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) {
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo)); STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
// pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity); // pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pResultInfo->capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initResultRowInfo(&pInfo->binfo.resultRowInfo, 8);
...@@ -8533,16 +8459,18 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* ...@@ -8533,16 +8459,18 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
// it is a project query, or group by column // it is a project query, or group by column
if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) { if (nodeType(pTargetNode->pExpr) == QUERY_NODE_COLUMN) {
pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr; SColumnNode* pColNode = (SColumnNode*) pTargetNode->pExpr;
SDataType* pType = &pColNode->node.resType; SDataType* pType = &pColNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName); pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pColNode->colName);
pCol->slotId = pColNode->slotId; pCol->slotId = pColNode->slotId; // TODO refactor
pCol->bytes = pType->bytes; pCol->bytes = pType->bytes;
pCol->type = pType->type; pCol->type = pType->type;
pCol->scale = pType->scale; pCol->scale = pType->scale;
pCol->precision = pType->precision; pCol->precision = pType->precision;
} else { } else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_FUNCTION) {
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr; SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
SDataType* pType = &pFuncNode->node.resType; SDataType* pType = &pFuncNode->node.resType;
...@@ -8556,7 +8484,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* ...@@ -8556,7 +8484,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList); int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
for (int32_t j = 0; j < numOfParam; ++j) { for (int32_t j = 0; j < numOfParam; ++j) {
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j); SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
SColumnNode* pcn = (SColumnNode*)p1; SColumnNode* pcn = (SColumnNode*)p1; // TODO refactor
pCol->slotId = pcn->slotId; pCol->slotId = pcn->slotId;
pCol->bytes = pcn->node.resType.bytes; pCol->bytes = pcn->node.resType.bytes;
...@@ -8565,6 +8493,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* ...@@ -8565,6 +8493,22 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
pCol->precision = pcn->node.resType.precision; pCol->precision = pcn->node.resType.precision;
pCol->dataBlockId = pcn->dataBlockId; pCol->dataBlockId = pcn->dataBlockId;
} }
} else if (nodeType(pTargetNode->pExpr) == QUERY_NODE_OPERATOR) {
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
SOperatorNode* pNode = (SOperatorNode*) pTargetNode->pExpr;
SDataType* pType = &pNode->node.resType;
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale, pType->precision, pNode->node.aliasName);
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
pCol->slotId = pTargetNode->slotId; // TODO refactor
pCol->bytes = pType->bytes;
pCol->type = pType->type;
pCol->scale = pType->scale;
pCol->precision = pType->precision;
} else {
ASSERT(0);
} }
} }
......
...@@ -25,23 +25,23 @@ extern "C" { ...@@ -25,23 +25,23 @@ extern "C" {
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
void functionFinalize(SqlFunctionCtx *pCtx); void functionFinalize(SqlFunctionCtx *pCtx);
bool getCountFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getCountFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void countFunction(SqlFunctionCtx *pCtx); void countFunction(SqlFunctionCtx *pCtx);
bool getSumFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSumFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void sumFunction(SqlFunctionCtx *pCtx); void sumFunction(SqlFunctionCtx *pCtx);
bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool minFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool maxFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
bool getMinmaxFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getMinmaxFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void minFunction(SqlFunctionCtx* pCtx); void minFunction(SqlFunctionCtx* pCtx);
void maxFunction(SqlFunctionCtx *pCtx); void maxFunction(SqlFunctionCtx *pCtx);
bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getStddevFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void stddevFunction(SqlFunctionCtx* pCtx); void stddevFunction(SqlFunctionCtx* pCtx);
void stddevFinalize(SqlFunctionCtx* pCtx); void stddevFinalize(SqlFunctionCtx* pCtx);
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getFirstLastFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
void firstFunction(SqlFunctionCtx *pCtx); void firstFunction(SqlFunctionCtx *pCtx);
void lastFunction(SqlFunctionCtx *pCtx); void lastFunction(SqlFunctionCtx *pCtx);
......
...@@ -331,6 +331,14 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) { ...@@ -331,6 +331,14 @@ int32_t stubCheckAndGetResultType(SFunctionNode* pFunc) {
case FUNCTION_TYPE_CONCAT: case FUNCTION_TYPE_CONCAT:
// todo // todo
break; break;
case FUNCTION_TYPE_ABS: {
SColumnNode* pParam = nodesListGetNode(pFunc->pParameterList, 0);
int32_t paraType = pParam->node.resType.type;
pFunc->node.resType = (SDataType) { .bytes = tDataTypes[paraType].bytes, .type = paraType };
break;
}
default: default:
ASSERT(0); // to found the fault ASAP. ASSERT(0); // to found the fault ASAP.
} }
......
...@@ -3078,8 +3078,8 @@ static void arithmetic_function(SqlFunctionCtx *pCtx) { ...@@ -3078,8 +3078,8 @@ static void arithmetic_function(SqlFunctionCtx *pCtx) {
GET_RES_INFO(pCtx)->numOfRes += pCtx->size; GET_RES_INFO(pCtx)->numOfRes += pCtx->size;
//SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz; //SScalarFunctionSupport *pSup = (SScalarFunctionSupport *)pCtx->param[1].pz;
SScalarParam output = {0}; // SScalarParam output = {0};
output.data = pCtx->pOutput; // output.data = pCtx->pOutput;
//evaluateExprNodeTree(pSup->pExprInfo->pExpr, pCtx->size, &output, pSup, getArithColumnData); //evaluateExprNodeTree(pSup->pExprInfo->pExpr, pCtx->size, &output, pSup, getArithColumnData);
} }
......
...@@ -43,10 +43,12 @@ typedef struct SScalarCtx { ...@@ -43,10 +43,12 @@ typedef struct SScalarCtx {
#define SCL_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define SCL_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
#define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) #define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
#define GET_PARAM_BYTES(_c) ((_c)->pColumnInfoData->info.bytes)
int32_t sclMoveParamListData(SScalarParam *params, int32_t listNum, int32_t idx);
bool sclIsNull(SScalarParam* param, int32_t idx);
void sclSetNull(SScalarParam* param, int32_t idx);
void sclFreeParam(SScalarParam *param); void sclFreeParam(SScalarParam *param);
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -22,19 +22,7 @@ extern "C" { ...@@ -22,19 +22,7 @@ extern "C" {
#include "function.h" #include "function.h"
#include "scalar.h" #include "scalar.h"
typedef struct SScalarFunctionSupport {
struct SExprInfo *pExprInfo;
int32_t numOfCols;
SColumnInfo *colList;
void *exprList; // client side used
int32_t offset;
char** data;
} SScalarFunctionSupport;
extern struct SScalarFunctionInfo scalarFunc[8];
int32_t evaluateExprNodeTree(tExprNode* pExprs, int32_t numOfRows, SScalarParam* pOutput,
void* param, char* (*getSourceDataBlock)(void*, const char*, int32_t));
#ifdef __cplusplus #ifdef __cplusplus
......
...@@ -20,10 +20,66 @@ ...@@ -20,10 +20,66 @@
extern "C" { extern "C" {
#endif #endif
#include "sclfunc.h" typedef double (*_getDoubleValue_fn_t)(void *src, int32_t index);
typedef double (*_mathFunc)(double, double, bool *); static FORCE_INLINE double getVectorDoubleValue_TINYINT(void *src, int32_t index) {
return (double)*((int8_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_UTINYINT(void *src, int32_t index) {
return (double)*((uint8_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_SMALLINT(void *src, int32_t index) {
return (double)*((int16_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_USMALLINT(void *src, int32_t index) {
return (double)*((uint16_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_INT(void *src, int32_t index) {
return (double)*((int32_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_UINT(void *src, int32_t index) {
return (double)*((uint32_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_BIGINT(void *src, int32_t index) {
return (double)*((int64_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_UBIGINT(void *src, int32_t index) {
return (double)*((uint64_t *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_FLOAT(void *src, int32_t index) {
return (double)*((float *)src + index);
}
static FORCE_INLINE double getVectorDoubleValue_DOUBLE(void *src, int32_t index) {
return (double)*((double *)src + index);
}
static FORCE_INLINE _getDoubleValue_fn_t getVectorDoubleValueFn(int32_t srcType) {
_getDoubleValue_fn_t p = NULL;
if (srcType == TSDB_DATA_TYPE_TINYINT) {
p = getVectorDoubleValue_TINYINT;
} else if (srcType == TSDB_DATA_TYPE_UTINYINT) {
p = getVectorDoubleValue_UTINYINT;
} else if (srcType == TSDB_DATA_TYPE_SMALLINT) {
p = getVectorDoubleValue_SMALLINT;
} else if (srcType == TSDB_DATA_TYPE_USMALLINT) {
p = getVectorDoubleValue_USMALLINT;
} else if (srcType == TSDB_DATA_TYPE_INT) {
p = getVectorDoubleValue_INT;
} else if (srcType == TSDB_DATA_TYPE_UINT) {
p = getVectorDoubleValue_UINT;
} else if (srcType == TSDB_DATA_TYPE_BIGINT) {
p = getVectorDoubleValue_BIGINT;
} else if (srcType == TSDB_DATA_TYPE_UBIGINT) {
p = getVectorDoubleValue_UBIGINT;
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
p = getVectorDoubleValue_FLOAT;
} else if (srcType == TSDB_DATA_TYPE_DOUBLE) {
p = getVectorDoubleValue_DOUBLE;
} else {
assert(0);
}
return p;
}
typedef void (*_bufConverteFunc)(char *buf, SScalarParam* pOut, int32_t outType); typedef void (*_bufConverteFunc)(char *buf, SScalarParam* pOut, int32_t outType);
typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *output, int32_t order); typedef void (*_bin_scalar_fn_t)(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *output, int32_t order);
......
...@@ -1021,26 +1021,21 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) { ...@@ -1021,26 +1021,21 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
if (node->opType == OP_TYPE_IN && (!IS_VAR_DATA_TYPE(type))) { if (node->opType == OP_TYPE_IN && (!IS_VAR_DATA_TYPE(type))) {
SNodeListNode *listNode = (SNodeListNode *)node->pRight; SNodeListNode *listNode = (SNodeListNode *)node->pRight;
SListCell *cell = listNode->pNodeList->pHead; SListCell *cell = listNode->pNodeList->pHead;
SScalarParam in = {.num = 1}, out = {.num = 1, .type = type};
SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
out.columnData->info.type = type;
for (int32_t i = 0; i < listNode->pNodeList->length; ++i) { for (int32_t i = 0; i < listNode->pNodeList->length; ++i) {
SValueNode *valueNode = (SValueNode *)cell->pNode; SValueNode *valueNode = (SValueNode *)cell->pNode;
in.type = valueNode->node.resType.type; code = doConvertDataType(valueNode, &out);
in.bytes = valueNode->node.resType.bytes;
in.data = nodesGetValueFromNode(valueNode);
out.data = taosMemoryMalloc(sizeof(int64_t));
code = vectorConvertImpl(&in, &out);
if (code) { if (code) {
fltError("convert from %d to %d failed", in.type, out.type); // fltError("convert from %d to %d failed", in.type, out.type);
taosMemoryFreeClear(out.data);
FLT_ERR_RET(code); FLT_ERR_RET(code);
} }
len = tDataTypes[type].bytes; len = tDataTypes[type].bytes;
filterAddField(info, NULL, &out.data, FLD_TYPE_VALUE, &right, len, true); filterAddField(info, NULL, (void**) &out.columnData->pData, FLD_TYPE_VALUE, &right, len, true);
filterAddUnit(info, OP_TYPE_EQUAL, &left, &right, &uidx); filterAddUnit(info, OP_TYPE_EQUAL, &left, &right, &uidx);
SFilterGroup fgroup = {0}; SFilterGroup fgroup = {0};
...@@ -1054,7 +1049,6 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) { ...@@ -1054,7 +1049,6 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode* tree, SArray *group) {
filterAddFieldFromNode(info, node->pRight, &right); filterAddFieldFromNode(info, node->pRight, &right);
FLT_ERR_RET(filterAddUnit(info, node->opType, &left, &right, &uidx)); FLT_ERR_RET(filterAddUnit(info, node->opType, &left, &right, &uidx));
SFilterGroup fgroup = {0}; SFilterGroup fgroup = {0};
filterAddUnitToGroup(&fgroup, uidx); filterAddUnitToGroup(&fgroup, uidx);
...@@ -1080,7 +1074,6 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u ...@@ -1080,7 +1074,6 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, POINTER_BYTES, false); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right. filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, POINTER_BYTES, false); // POINTER_BYTES should be sizeof(SHashObj), but POINTER_BYTES is also right.
t = FILTER_GET_FIELD(dst, right); t = FILTER_GET_FIELD(dst, right);
FILTER_SET_FLAG(t->flag, FLD_DATA_IS_HASH); FILTER_SET_FLAG(t->flag, FLD_DATA_IS_HASH);
} else { } else {
filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, varDataTLen(data), false); filterAddField(dst, NULL, &data, FLD_TYPE_VALUE, &right, varDataTLen(data), false);
...@@ -1101,14 +1094,12 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u ...@@ -1101,14 +1094,12 @@ int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit* u
int32_t filterAddUnitRight(SFilterInfo *info, uint8_t optr, SFilterFieldId *right, uint32_t uidx) { int32_t filterAddUnitRight(SFilterInfo *info, uint8_t optr, SFilterFieldId *right, uint32_t uidx) {
SFilterUnit *u = &info->units[uidx]; SFilterUnit *u = &info->units[uidx];
u->compare.optr2 = optr; u->compare.optr2 = optr;
u->right2 = *right; u->right2 = *right;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRangeCtx *ctx, uint32_t cidx, SFilterGroup *g, int32_t optr, SArray *res) { int32_t filterAddGroupUnitFromCtx(SFilterInfo *dst, SFilterInfo *src, SFilterRangeCtx *ctx, uint32_t cidx, SFilterGroup *g, int32_t optr, SArray *res) {
SFilterFieldId left, right, right2; SFilterFieldId left, right, right2;
uint32_t uidx = 0; uint32_t uidx = 0;
...@@ -1800,9 +1791,12 @@ int32_t fltInitValFieldData(SFilterInfo *info) { ...@@ -1800,9 +1791,12 @@ int32_t fltInitValFieldData(SFilterInfo *info) {
if (dType->type == type) { if (dType->type == type) {
assignVal(fi->data, nodesGetValueFromNode(var), dType->bytes, type); assignVal(fi->data, nodesGetValueFromNode(var), dType->bytes, type);
} else { } else {
SScalarParam in = {.data = nodesGetValueFromNode(var), .num = 1, .type = dType->type, .bytes = dType->bytes}; SScalarParam out = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
SScalarParam out = {.data = fi->data, .num = 1, .type = type}; out.columnData->info.type = type;
if (vectorConvertImpl(&in, &out)) {
// todo refactor the convert
int32_t code = doConvertDataType(var, &out);
if (code != TSDB_CODE_SUCCESS) {
qError("convert value to type[%d] failed", type); qError("convert value to type[%d] failed", type);
return TSDB_CODE_TSC_INVALID_OPERATION; return TSDB_CODE_TSC_INVALID_OPERATION;
} }
...@@ -3636,7 +3630,7 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options) ...@@ -3636,7 +3630,7 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options)
if (*pInfo == NULL) { if (*pInfo == NULL) {
*pInfo = taosMemoryCalloc(1, sizeof(SFilterInfo)); *pInfo = taosMemoryCalloc(1, sizeof(SFilterInfo));
if (NULL == *pInfo) { if (NULL == *pInfo) {
fltError("calloc %d failed", (int32_t)sizeof(SFilterInfo)); fltError("taosMemoryCalloc %d failed", (int32_t)sizeof(SFilterInfo));
FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); FLT_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
} }
...@@ -3676,18 +3670,18 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData ...@@ -3676,18 +3670,18 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output)); FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
taosArrayDestroy(pList); taosArrayDestroy(pList);
// TODO Fix it
*p = output.orig.data; // *p = output.orig.data;
output.orig.data = NULL; // output.orig.data = NULL;
//
sclFreeParam(&output); // sclFreeParam(&output);
//
int8_t *r = output.data; // int8_t *r = output.data;
for (int32_t i = 0; i < output.num; ++i) { // for (int32_t i = 0; i < output.num; ++i) {
if (0 == *(r+i)) { // if (0 == *(r+i)) {
return false; // return false;
} // }
} // }
return true; return true;
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
...@@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (pTask->sinkType == TASK_SINK__TABLE) { if (pTask->sinkType == TASK_SINK__TABLE) {
// //
} else if (pTask->sinkType == TASK_SINK__SMA) { } else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
// //
} else if (pTask->sinkType == TASK_SINK__FETCH) { } else if (pTask->sinkType == TASK_SINK__FETCH) {
// //
...@@ -205,9 +206,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { ...@@ -205,9 +206,16 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->sinkType != TASK_SINK__NONE) { if (pTask->sinkType == TASK_SINK__TABLE) {
// TODO: wrap
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
...@@ -244,8 +252,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { ...@@ -244,8 +252,16 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
} }
if (pTask->sinkType != TASK_SINK__NONE) { if (pTask->sinkType == TASK_SINK__TABLE) {
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
......
...@@ -8,6 +8,8 @@ target_sources(tdb ...@@ -8,6 +8,8 @@ target_sources(tdb
"src/db/tdbBtree.c" "src/db/tdbBtree.c"
"src/db/tdbDb.c" "src/db/tdbDb.c"
"src/db/tdbEnv.c" "src/db/tdbEnv.c"
"src/db/tdbTxn.c"
"src/db/tdbOs.c"
"src/page/tdbPage.c" "src/page/tdbPage.c"
"src/page/tdbPageL.c" "src/page/tdbPageL.c"
) )
......
...@@ -22,43 +22,6 @@ ...@@ -22,43 +22,6 @@
extern "C" { extern "C" {
#endif #endif
// typedef struct STDb TDB;
// typedef struct STDbEnv TENV;
// typedef struct STDbCurosr TDBC;
// typedef int32_t pgsz_t;
// typedef int32_t cachesz_t;
// typedef int (*TdbKeyCmprFn)(int keyLen1, const void *pKey1, int keyLen2, const void *pKey2);
// // TEVN
// int tdbEnvCreate(TENV **ppEnv, const char *rootDir);
// int tdbEnvOpen(TENV *ppEnv);
// int tdbEnvClose(TENV *pEnv);
// int tdbEnvSetCache(TENV *pEnv, pgsz_t pgSize, cachesz_t cacheSize);
// pgsz_t tdbEnvGetPageSize(TENV *pEnv);
// cachesz_t tdbEnvGetCacheSize(TENV *pEnv);
// int tdbEnvBeginTxn(TENV *pEnv);
// int tdbEnvCommit(TENV *pEnv);
// // TDB
// int tdbCreate(TDB **ppDb);
// int tdbOpen(TDB *pDb, const char *fname, const char *dbname, TENV *pEnv);
// int tdbClose(TDB *pDb);
// int tdbDrop(TDB *pDb);
// int tdbSetKeyLen(TDB *pDb, int klen);
// int tdbSetValLen(TDB *pDb, int vlen);
// int tdbSetDup(TDB *pDb, int dup);
// int tdbSetCmprFunc(TDB *pDb, TdbKeyCmprFn fn);
// int tdbGetKeyLen(TDB *pDb);
// int tdbGetValLen(TDB *pDb);
// int tdbGetDup(TDB *pDb);
// int tdbInsert(TDB *pDb, const void *pKey, int nKey, const void *pData, int nData);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -67,7 +67,7 @@ typedef struct { ...@@ -67,7 +67,7 @@ typedef struct {
u8 *pTmpSpace; u8 *pTmpSpace;
} SCellDecoder; } SCellDecoder;
static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst); static int tdbBtCursorMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst);
static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2); static int tdbDefaultKeyCmprFn(const void *pKey1, int keyLen1, const void *pKey2, int keyLen2);
static int tdbBtreeOpenImpl(SBTree *pBt); static int tdbBtreeOpenImpl(SBTree *pBt);
static int tdbBtreeZeroPage(SPage *pPage, void *arg); static int tdbBtreeZeroPage(SPage *pPage, void *arg);
...@@ -75,10 +75,10 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg); ...@@ -75,10 +75,10 @@ static int tdbBtreeInitPage(SPage *pPage, void *arg);
static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell, static int tdbBtreeEncodeCell(SPage *pPage, const void *pKey, int kLen, const void *pVal, int vLen, SCell *pCell,
int *szCell); int *szCell);
static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder); static int tdbBtreeDecodeCell(SPage *pPage, const SCell *pCell, SCellDecoder *pDecoder);
static int tdbBtreeBalance(SBTC *pCur); static int tdbBtreeBalance(SBTC *pBtc);
static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell); static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell);
static int tdbBtcMoveToNext(SBTC *pBtc); static int tdbBtcMoveToNext(SBTC *pBtc);
static int tdbBtcMoveDownward(SBTC *pCur, SPgno pgno); static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno);
static int tdbBtcMoveUpward(SBTC *pBtc); static int tdbBtcMoveUpward(SBTC *pBtc);
int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, SBTree **ppBt) { int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, SBTree **ppBt) {
...@@ -87,7 +87,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S ...@@ -87,7 +87,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
*ppBt = NULL; *ppBt = NULL;
pBt = (SBTree *)calloc(1, sizeof(*pBt)); pBt = (SBTree *)tdbOsCalloc(1, sizeof(*pBt));
if (pBt == NULL) { if (pBt == NULL) {
return -1; return -1;
} }
...@@ -121,7 +121,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S ...@@ -121,7 +121,7 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, FKeyComparator kcmpr, S
// TODO: pBt->root // TODO: pBt->root
ret = tdbBtreeOpenImpl(pBt); ret = tdbBtreeOpenImpl(pBt);
if (ret < 0) { if (ret < 0) {
free(pBt); tdbOsFree(pBt);
return -1; return -1;
} }
...@@ -134,7 +134,7 @@ int tdbBtreeClose(SBTree *pBt) { ...@@ -134,7 +134,7 @@ int tdbBtreeClose(SBTree *pBt) {
return 0; return 0;
} }
int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, int vLen) { int tdbBtCursorInsert(SBTC *pBtc, const void *pKey, int kLen, const void *pVal, int vLen) {
int ret; int ret;
int idx; int idx;
SPager *pPager; SPager *pPager;
...@@ -143,20 +143,20 @@ int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, ...@@ -143,20 +143,20 @@ int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal,
int cret; int cret;
SBTree *pBt; SBTree *pBt;
ret = tdbBtCursorMoveTo(pCur, pKey, kLen, &cret); ret = tdbBtCursorMoveTo(pBtc, pKey, kLen, &cret);
if (ret < 0) { if (ret < 0) {
// TODO: handle error // TODO: handle error
return -1; return -1;
} }
if (pCur->idx == -1) { if (pBtc->idx == -1) {
ASSERT(TDB_PAGE_TOTAL_CELLS(pCur->pPage) == 0); ASSERT(TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0);
idx = 0; idx = 0;
} else { } else {
if (cret > 0) { if (cret > 0) {
idx = pCur->idx + 1; idx = pBtc->idx + 1;
} else if (cret < 0) { } else if (cret < 0) {
idx = pCur->idx; idx = pBtc->idx;
} else { } else {
/* TODO */ /* TODO */
ASSERT(0); ASSERT(0);
...@@ -164,9 +164,9 @@ int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, ...@@ -164,9 +164,9 @@ int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal,
} }
// TODO: refact code here // TODO: refact code here
pBt = pCur->pBt; pBt = pBtc->pBt;
if (!pBt->pTmp) { if (!pBt->pTmp) {
pBt->pTmp = (u8 *)malloc(pBt->pageSize); pBt->pTmp = (u8 *)tdbOsMalloc(pBt->pageSize);
if (pBt->pTmp == NULL) { if (pBt->pTmp == NULL) {
return -1; return -1;
} }
...@@ -175,20 +175,20 @@ int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal, ...@@ -175,20 +175,20 @@ int tdbBtCursorInsert(SBTC *pCur, const void *pKey, int kLen, const void *pVal,
pCell = pBt->pTmp; pCell = pBt->pTmp;
// Encode the cell // Encode the cell
ret = tdbBtreeEncodeCell(pCur->pPage, pKey, kLen, pVal, vLen, pCell, &szCell); ret = tdbBtreeEncodeCell(pBtc->pPage, pKey, kLen, pVal, vLen, pCell, &szCell);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
// Insert the cell to the index // Insert the cell to the index
ret = tdbPageInsertCell(pCur->pPage, idx, pCell, szCell, 0); ret = tdbPageInsertCell(pBtc->pPage, idx, pCell, szCell, 0);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
// If page is overflow, balance the tree // If page is overflow, balance the tree
if (pCur->pPage->nOverflow > 0) { if (pBtc->pPage->nOverflow > 0) {
ret = tdbBtreeBalance(pCur); ret = tdbBtreeBalance(pBtc);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
...@@ -226,30 +226,30 @@ int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen ...@@ -226,30 +226,30 @@ int tdbBtreeGet(SBTree *pBt, const void *pKey, int kLen, void **ppVal, int *vLen
return 0; return 0;
} }
static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst) { static int tdbBtCursorMoveTo(SBTC *pBtc, const void *pKey, int kLen, int *pCRst) {
int ret; int ret;
SBTree *pBt; SBTree *pBt;
SPager *pPager; SPager *pPager;
pBt = pCur->pBt; pBt = pBtc->pBt;
pPager = pBt->pPager; pPager = pBt->pPager;
if (pCur->iPage < 0) { if (pBtc->iPage < 0) {
ASSERT(pCur->iPage == -1); ASSERT(pBtc->iPage == -1);
ASSERT(pCur->idx == -1); ASSERT(pBtc->idx == -1);
// Move from the root // Move from the root
ret = tdbPagerFetchPage(pPager, pBt->root, &(pCur->pPage), tdbBtreeInitPage, pBt); ret = tdbPagerFetchPage(pPager, pBt->root, &(pBtc->pPage), tdbBtreeInitPage, pBt);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
return -1; return -1;
} }
pCur->iPage = 0; pBtc->iPage = 0;
if (TDB_PAGE_TOTAL_CELLS(pCur->pPage) == 0) { if (TDB_PAGE_TOTAL_CELLS(pBtc->pPage) == 0) {
// Current page is empty // Current page is empty
// ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pCur->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF)); // ASSERT(TDB_FLAG_IS(TDB_PAGE_FLAGS(pBtc->pPage), TDB_BTREE_ROOT | TDB_BTREE_LEAF));
return 0; return 0;
} }
...@@ -259,7 +259,7 @@ static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst) ...@@ -259,7 +259,7 @@ static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst)
SPage *pPage; SPage *pPage;
SCellDecoder cd = {0}; SCellDecoder cd = {0};
pPage = pCur->pPage; pPage = pBtc->pPage;
nCells = TDB_PAGE_TOTAL_CELLS(pPage); nCells = TDB_PAGE_TOTAL_CELLS(pPage);
lidx = 0; lidx = 0;
ridx = nCells - 1; ridx = nCells - 1;
...@@ -297,22 +297,22 @@ static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst) ...@@ -297,22 +297,22 @@ static int tdbBtCursorMoveTo(SBTC *pCur, const void *pKey, int kLen, int *pCRst)
u8 flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); u8 flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
u8 leaf = TDB_BTREE_PAGE_IS_LEAF(flags); u8 leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
if (leaf) { if (leaf) {
pCur->idx = midx; pBtc->idx = midx;
*pCRst = c; *pCRst = c;
break; break;
} else { } else {
if (c <= 0) { if (c <= 0) {
pCur->idx = midx; pBtc->idx = midx;
tdbBtcMoveDownward(pCur, cd.pgno); tdbBtcMoveDownward(pBtc, cd.pgno);
} else { } else {
pCur->idx = midx + 1; pBtc->idx = midx + 1;
if (midx == nCells - 1) { if (midx == nCells - 1) {
/* Move to right-most child */ /* Move to right-most child */
tdbBtcMoveDownward(pCur, ((SIntHdr *)pCur->pPage->pData)->pgno); tdbBtcMoveDownward(pBtc, ((SIntHdr *)pBtc->pPage->pData)->pgno);
} else { } else {
pCell = tdbPageGetCell(pPage, pCur->idx); pCell = tdbPageGetCell(pPage, pBtc->idx);
tdbBtreeDecodeCell(pPage, pCell, &cd); tdbBtreeDecodeCell(pPage, pCell, &cd);
tdbBtcMoveDownward(pCur, cd.pgno); tdbBtcMoveDownward(pBtc, cd.pgno);
} }
} }
} }
...@@ -550,7 +550,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ...@@ -550,7 +550,7 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) { if (sIdx + i < TDB_PAGE_TOTAL_CELLS(pParent)) {
pCell = tdbPageGetCell(pParent, sIdx + i); pCell = tdbPageGetCell(pParent, sIdx + i);
szDivCell[i] = tdbBtreeCellSize(pParent, pCell); szDivCell[i] = tdbBtreeCellSize(pParent, pCell);
pDivCell[i] = malloc(szDivCell[i]); pDivCell[i] = tdbOsMalloc(szDivCell[i]);
memcpy(pDivCell[i], pCell, szDivCell[i]); memcpy(pDivCell[i], pCell, szDivCell[i]);
} }
...@@ -740,13 +740,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ...@@ -740,13 +740,13 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
tdbBtreeDecodeCell(pPage, pCell, &cd); tdbBtreeDecodeCell(pPage, pCell, &cd);
// TODO: pCell here may be inserted as an overflow cell, handle it // TODO: pCell here may be inserted as an overflow cell, handle it
SCell *pNewCell = malloc(cd.kLen + 9); SCell *pNewCell = tdbOsMalloc(cd.kLen + 9);
int szNewCell; int szNewCell;
SPgno pgno; SPgno pgno;
pgno = TDB_PAGE_PGNO(pNews[iNew]); pgno = TDB_PAGE_PGNO(pNews[iNew]);
tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell); tdbBtreeEncodeCell(pParent, cd.pKey, cd.kLen, (void *)&pgno, sizeof(SPgno), pNewCell, &szNewCell);
tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0); tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0);
free(pNewCell); tdbOsFree(pNewCell);
} }
// move to next new page // move to next new page
...@@ -798,14 +798,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) { ...@@ -798,14 +798,14 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx) {
for (int i = 0; i < 3; i++) { for (int i = 0; i < 3; i++) {
if (pDivCell[i]) { if (pDivCell[i]) {
free(pDivCell[i]); tdbOsFree(pDivCell[i]);
} }
} }
return 0; return 0;
} }
static int tdbBtreeBalance(SBTC *pCur) { static int tdbBtreeBalance(SBTC *pBtc) {
int iPage; int iPage;
SPage *pParent; SPage *pParent;
SPage *pPage; SPage *pPage;
...@@ -816,8 +816,8 @@ static int tdbBtreeBalance(SBTC *pCur) { ...@@ -816,8 +816,8 @@ static int tdbBtreeBalance(SBTC *pCur) {
// Main loop to balance the BTree // Main loop to balance the BTree
for (;;) { for (;;) {
iPage = pCur->iPage; iPage = pBtc->iPage;
pPage = pCur->pPage; pPage = pBtc->pPage;
flags = TDB_BTREE_PAGE_GET_FLAGS(pPage); flags = TDB_BTREE_PAGE_GET_FLAGS(pPage);
leaf = TDB_BTREE_PAGE_IS_LEAF(flags); leaf = TDB_BTREE_PAGE_IS_LEAF(flags);
root = TDB_BTREE_PAGE_IS_ROOT(flags); root = TDB_BTREE_PAGE_IS_ROOT(flags);
...@@ -833,27 +833,27 @@ static int tdbBtreeBalance(SBTC *pCur) { ...@@ -833,27 +833,27 @@ static int tdbBtreeBalance(SBTC *pCur) {
// ignore the case of empty // ignore the case of empty
if (pPage->nOverflow == 0) break; if (pPage->nOverflow == 0) break;
ret = tdbBtreeBalanceDeeper(pCur->pBt, pPage, &(pCur->pgStack[1])); ret = tdbBtreeBalanceDeeper(pBtc->pBt, pPage, &(pBtc->pgStack[1]));
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
pCur->idx = 0; pBtc->idx = 0;
pCur->idxStack[0] = 0; pBtc->idxStack[0] = 0;
pCur->pgStack[0] = pCur->pPage; pBtc->pgStack[0] = pBtc->pPage;
pCur->iPage = 1; pBtc->iPage = 1;
pCur->pPage = pCur->pgStack[1]; pBtc->pPage = pBtc->pgStack[1];
} else { } else {
// Generalized balance step // Generalized balance step
pParent = pCur->pgStack[iPage - 1]; pParent = pBtc->pgStack[iPage - 1];
ret = tdbBtreeBalanceNonRoot(pCur->pBt, pParent, pCur->idxStack[pCur->iPage - 1]); ret = tdbBtreeBalanceNonRoot(pBtc->pBt, pParent, pBtc->idxStack[pBtc->iPage - 1]);
if (ret < 0) { if (ret < 0) {
return -1; return -1;
} }
pCur->iPage--; pBtc->iPage--;
pCur->pPage = pCur->pgStack[pCur->iPage]; pBtc->pPage = pBtc->pgStack[pBtc->iPage];
} }
} }
...@@ -1050,11 +1050,11 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) { ...@@ -1050,11 +1050,11 @@ static int tdbBtreeCellSize(const SPage *pPage, SCell *pCell) {
#endif #endif
int tdbBtcOpen(SBTC *pCur, SBTree *pBt) { int tdbBtcOpen(SBTC *pBtc, SBTree *pBt) {
pCur->pBt = pBt; pBtc->pBt = pBt;
pCur->iPage = -1; pBtc->iPage = -1;
pCur->pPage = NULL; pBtc->pPage = NULL;
pCur->idx = -1; pBtc->idx = -1;
return 0; return 0;
} }
...@@ -1262,16 +1262,16 @@ int tdbBtcClose(SBTC *pBtc) { ...@@ -1262,16 +1262,16 @@ int tdbBtcClose(SBTC *pBtc) {
return 0; return 0;
} }
static int tdbBtcMoveDownward(SBTC *pCur, SPgno pgno) { static int tdbBtcMoveDownward(SBTC *pBtc, SPgno pgno) {
int ret; int ret;
pCur->pgStack[pCur->iPage] = pCur->pPage; pBtc->pgStack[pBtc->iPage] = pBtc->pPage;
pCur->idxStack[pCur->iPage] = pCur->idx; pBtc->idxStack[pBtc->iPage] = pBtc->idx;
pCur->iPage++; pBtc->iPage++;
pCur->pPage = NULL; pBtc->pPage = NULL;
pCur->idx = -1; pBtc->idx = -1;
ret = tdbPagerFetchPage(pCur->pBt->pPager, pgno, &pCur->pPage, tdbBtreeInitPage, pCur->pBt); ret = tdbPagerFetchPage(pBtc->pBt->pPager, pgno, &pBtc->pPage, tdbBtreeInitPage, pBtc->pBt);
if (ret < 0) { if (ret < 0) {
ASSERT(0); ASSERT(0);
} }
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
int tdbTxnBegin(TENV *pEnv) {
// TODO
return 0;
}
int tdbTxnCommit(TENV *pEnv) {
// TODO
return 0;
}
int tdbTxnRollback(TENV *pEnv) {
// TODO
return 0;
}
\ No newline at end of file
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册