提交 88f0a0b3 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into fix/hzcheng_3.0

......@@ -240,6 +240,7 @@ typedef struct SNodeList {
#define SNodeptr void*
int32_t nodesNodeSize(ENodeType type);
SNodeptr nodesMakeNode(ENodeType type);
void nodesDestroyNode(SNodeptr pNode);
......
......@@ -48,6 +48,7 @@ typedef struct SExprNode {
ENodeType type;
SDataType resType;
char aliasName[TSDB_COL_NAME_LEN];
char userAlias[TSDB_COL_NAME_LEN];
SArray* pAssociation;
} SExprNode;
......@@ -325,7 +326,7 @@ typedef struct SQuery {
bool showRewrite;
int32_t placeholderNum;
SArray* pPlaceholderValues;
SNode* pContainPlaceholderRoot;
SNode* pPrepareRoot;
} SQuery;
void nodesWalkSelectStmt(SSelectStmt* pSelect, ESqlClause clause, FNodeWalker walker, void* pContext);
......
......@@ -1314,6 +1314,90 @@ int32_t doSetupUdf(char udfName[], UdfcFuncHandle *funcHandle) {
return err;
}
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
}
int32_t acquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
int32_t stubIndex = taosArraySearchIdx(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (stubIndex != -1) {
SUdfcFuncStub *foundStub = taosArrayGet(gUdfdProxy.udfStubs, stubIndex);
UdfcFuncHandle handle = foundStub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
} else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
udfName, foundStub->refCount, foundStub->lastRefTime);
taosArrayRemove(gUdfdProxy.udfStubs, stubIndex);
}
}
*pHandle = NULL;
code = doSetupUdf(udfName, pHandle);
if (code == TSDB_CODE_SUCCESS) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
*pHandle = NULL;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return code;
}
void releaseUdfFuncHandle(char* udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
ASSERT(foundStub);
--foundStub->refCount;
ASSERT(foundStub->refCount>=0);
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
int32_t cleanUpUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
UdfcFuncHandle handle = stub->handle;
if (handle != NULL && ((SUdfcUvSession*)handle)->udfUvPipe != NULL) {
taosArrayPush(udfStubs, stub);
} else {
fnInfo("invalid handle for %s, refCount: %d, last ref time: %"PRId64". remove it from cache",
stub->udfName, stub->refCount, stub->lastRefTime);
}
}
++i;
}
taosArrayDestroy(gUdfdProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
}
int32_t callUdf(UdfcFuncHandle handle, int8_t callType, SSDataBlock *input, SUdfInterBuf *state, SUdfInterBuf *state2,
SSDataBlock* output, SUdfInterBuf *newState) {
fnTrace("udfc call udf. callType: %d, funcHandle: %p", callType, handle);
......@@ -1437,57 +1521,10 @@ int32_t doCallUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t
return err;
}
int compareUdfcFuncSub(const void* elem1, const void* elem2) {
SUdfcFuncStub *stub1 = (SUdfcFuncStub *)elem1;
SUdfcFuncStub *stub2 = (SUdfcFuncStub *)elem2;
return strcmp(stub1->udfName, stub2->udfName);
}
int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
int32_t code = 0;
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
if (foundStub != NULL) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
*pHandle = foundStub->handle;
++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
return 0;
}
*pHandle = NULL;
code = doSetupUdf(udfName, pHandle);
if (code == TSDB_CODE_SUCCESS) {
SUdfcFuncStub stub = {0};
strcpy(stub.udfName, udfName);
stub.handle = *pHandle;
++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else {
*pHandle = NULL;
}
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return code;
}
void releaseUdfFuncHandle(char* udfName) {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
SUdfcFuncStub key = {0};
strcpy(key.udfName, udfName);
SUdfcFuncStub *foundStub = taosArraySearch(gUdfdProxy.udfStubs, &key, compareUdfcFuncSub, TD_EQ);
ASSERT(foundStub);
--foundStub->refCount;
ASSERT(foundStub->refCount>=0);
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
}
int32_t callUdfScalarFunc(char *udfName, SScalarParam *input, int32_t numOfCols, SScalarParam *output) {
UdfcFuncHandle handle = NULL;
int32_t code = accquireUdfFuncHandle(udfName, &handle);
int32_t code = acquireUdfFuncHandle(udfName, &handle);
if (code != 0) {
return code;
}
......@@ -1549,7 +1586,7 @@ bool udfAggInit(struct SqlFunctionCtx *pCtx, struct SResultRowEntryInfo* pResult
}
UdfcFuncHandle handle;
int32_t udfCode = 0;
if ((udfCode = accquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
if ((udfCode = acquireUdfFuncHandle((char *)pCtx->udfName, &handle)) != 0) {
fnError("udfAggInit error. step doSetupUdf. udf code: %d", udfCode);
return false;
}
......@@ -1662,25 +1699,3 @@ int32_t udfAggFinalize(struct SqlFunctionCtx *pCtx, SSDataBlock* pBlock) {
releaseUdfFuncHandle(pCtx->udfName);
return udfCallCode == 0 ? numOfResults : udfCallCode;
}
int32_t cleanUpUdfs() {
uv_mutex_lock(&gUdfdProxy.udfStubsMutex);
int32_t i = 0;
SArray* udfStubs = taosArrayInit(16, sizeof(SUdfcFuncStub));
while (i < taosArrayGetSize(gUdfdProxy.udfStubs)) {
SUdfcFuncStub *stub = taosArrayGet(gUdfdProxy.udfStubs, i);
if (stub->refCount == 0) {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
doTeardownUdf(stub->handle);
} else {
fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
taosArrayPush(udfStubs, stub);
}
++i;
}
taosArrayDestroy(gUdfdProxy.udfStubs);
gUdfdProxy.udfStubs = udfStubs;
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
return 0;
}
\ No newline at end of file
......@@ -86,17 +86,148 @@ typedef struct SUdf {
TUdfDestroyFunc destroyFunc;
} SUdf;
// TODO: low priority: change name onxxx to xxxCb, and udfc or udfd as prefix
// TODO: add private udf structure.
typedef struct SUdfcFuncHandle {
SUdf *udf;
} SUdfcFuncHandle;
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf);
typedef enum EUdfdRpcReqRspType {
UDFD_RPC_MNODE_CONNECT = 0,
UDFD_RPC_RETRIVE_FUNC,
} EUdfdRpcReqRspType;
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void* param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
ASSERT(pMsg->ahandle != NULL);
if (pEpSet) {
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&global.mgmtEp, pEpSet);
}
}
if (pMsg->code != TSDB_CODE_SUCCESS) {
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
msgInfo->code = pMsg->code;
goto _return;
}
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
goto _return;
}
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
}
msgInfo->code = 0;
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
tFreeSFuncInfo(pFuncInfo);
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
}
_return:
rpcFreeCont(pMsg->pCont);
uv_sem_post(&msgInfo->resultSem);
return;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
uv_sem_destroy(&msgInfo->resultSem);
int32_t code = msgInfo->code;
taosMemoryFree(msgInfo);
return code;
}
int32_t udfdConnectToMnode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId());
connReq.startTime = htobe64(taosGetTimestampMs());
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_MND_CONNECT;
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
int32_t code = msgInfo->code;
uv_sem_destroy(&msgInfo->resultSem);
taosMemoryFree(msgInfo);
return code;
}
int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
strcpy(udf->name, udfName);
int32_t err = 0;
err = udfdFillUdfInfoFromMNode(global.clientRpc, udf->name, udf);
if (err != 0) {
fnError("can not retrieve udf from mnode. udf name %s", udfName);
......@@ -515,140 +646,6 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop(global.loop);
}
typedef enum EUdfdRpcReqRspType {
UDFD_RPC_MNODE_CONNECT = 0,
UDFD_RPC_RETRIVE_FUNC,
} EUdfdRpcReqRspType;
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void* param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
ASSERT(pMsg->ahandle != NULL);
if (pEpSet) {
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&global.mgmtEp, pEpSet);
}
}
if (pMsg->code != TSDB_CODE_SUCCESS) {
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
msgInfo->code = pMsg->code;
goto _return;
}
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
goto _return;
}
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
}
msgInfo->code = 0;
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
tFreeSFuncInfo(pFuncInfo);
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
}
_return:
rpcFreeCont(pMsg->pCont);
uv_sem_post(&msgInfo->resultSem);
return;
}
int32_t udfdConnectToMNode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId());
connReq.startTime = htobe64(taosGetTimestampMs());
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_MND_CONNECT;
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
int32_t code = msgInfo->code;
uv_sem_destroy(&msgInfo->resultSem);
taosMemoryFree(msgInfo);
return code;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
uv_sem_destroy(&msgInfo->resultSem);
int32_t code = msgInfo->code;
taosMemoryFree(msgInfo);
return code;
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
......@@ -884,7 +881,18 @@ int main(int argc, char *argv[]) {
return -3;
}
if (udfdConnectToMNode() != 0) {
int32_t retryMnodeTimes = 0;
int32_t code = 0;
while (retryMnodeTimes++ < TSDB_MAX_REPLICA) {
uv_sleep(500 * ( 1 << retryMnodeTimes));
code = udfdConnectToMnode();
if (code == 0) {
break;
}
fnError("can not connect to mnode, code: %s. retry", tstrerror(code));
}
if (code != 0) {
fnError("failed to start since can not connect to mnode");
return -4;
}
......
......@@ -41,41 +41,41 @@ int scalarFuncTest() {
fnError("setup udf failure");
return -1;
}
SSDataBlock block = {0};
SSDataBlock *pBlock = &block;
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1;
pBlock->info.rows = 4;
char data[16] = {0};
char bitmap[4] = {0};
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
int64_t beg = taosGetTimestampUs();
for (int k = 0; k < 1; ++k) {
SSDataBlock block = {0};
SSDataBlock *pBlock = &block;
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1;
pBlock->info.rows = 1024;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows);
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
}
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
taosArrayPush(pBlock->pDataBlock, &colInfo);
}
SScalarParam input = {0};
input.numOfRows = pBlock->info.rows;
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
doCallUdfScalarFunc(handle, &input, 1, &output);
taosArrayDestroy(pBlock->pDataBlock);
SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) {
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
SScalarParam input = {0};
input.numOfRows = pBlock->info.rows;
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
doCallUdfScalarFunc(handle, &input, 1, &output);
taosArrayDestroy(pBlock->pDataBlock);
SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) {
if (i % 100 == 0)
fprintf(stderr, "%d\t%d\n", i, *(int32_t *)(col->pData + i * sizeof(int32_t)));
}
colDataDestroy(output.columnData);
taosMemoryFree(output.columnData);
}
colDataDestroy(output.columnData);
taosMemoryFree(output.columnData);
int64_t end = taosGetTimestampUs();
fprintf(stderr, "time: %f\n", (end-beg)/1000.0);
doTeardownUdf(handle);
return 0;
......@@ -93,16 +93,13 @@ int aggregateFuncTest() {
SSDataBlock *pBlock = &block;
pBlock->pDataBlock = taosArrayInit(1, sizeof(SColumnInfoData));
pBlock->info.numOfCols = 1;
pBlock->info.rows = 4;
char data[16] = {0};
char bitmap[4] = {0};
pBlock->info.rows = 1024;
for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) {
SColumnInfoData colInfo = {0};
colInfo.info.type = TSDB_DATA_TYPE_INT;
colInfo.info.bytes = sizeof(int32_t);
colInfo.info.colId = 1;
colInfo.pData = data;
colInfo.nullbitmap = bitmap;
colInfoDataEnsureCapacity(&colInfo, 0, pBlock->info.rows);
for (int32_t j = 0; j < pBlock->info.rows; ++j) {
colDataAppendInt32(&colInfo, j, &j);
}
......
......@@ -19,21 +19,6 @@
#include "taos.h"
#include "taoserror.h"
#define COPY_ALL_SCALAR_FIELDS \
do { \
memcpy((pDst), (pSrc), sizeof(*pSrc)); \
} while (0)
#define COPY_SCALAR_FIELD(fldname) \
do { \
(pDst)->fldname = (pSrc)->fldname; \
} while (0)
#define COPY_CHAR_ARRAY_FIELD(fldname) \
do { \
strcpy((pDst)->fldname, (pSrc)->fldname); \
} while (0)
#define COPY_CHAR_POINT_FIELD(fldname) \
do { \
if (NULL == (pSrc)->fldname) { \
......@@ -85,34 +70,22 @@
} \
} while (0)
static void dataTypeCopy(const SDataType* pSrc, SDataType* pDst) {
COPY_SCALAR_FIELD(type);
COPY_SCALAR_FIELD(precision);
COPY_SCALAR_FIELD(scale);
COPY_SCALAR_FIELD(bytes);
}
static void dataTypeCopy(const SDataType* pSrc, SDataType* pDst) {}
static void exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) {
static SNode* exprNodeCopy(const SExprNode* pSrc, SExprNode* pDst) {
dataTypeCopy(&pSrc->resType, &pDst->resType);
COPY_CHAR_ARRAY_FIELD(aliasName);
pDst->pAssociation = NULL;
return (SNode*)pDst;
}
static SNode* columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) {
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_SCALAR_FIELD(colId);
COPY_SCALAR_FIELD(colType);
COPY_CHAR_ARRAY_FIELD(dbName);
COPY_CHAR_ARRAY_FIELD(tableName);
COPY_CHAR_ARRAY_FIELD(tableAlias);
COPY_CHAR_ARRAY_FIELD(colName);
COPY_SCALAR_FIELD(dataBlockId);
COPY_SCALAR_FIELD(slotId);
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
pDst->pProjectRef = NULL;
return (SNode*)pDst;
}
static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
COPY_CHAR_POINT_FIELD(literal);
if (!pSrc->translate) {
return (SNode*)pDst;
......@@ -139,57 +112,114 @@ static SNode* valueNodeCopy(const SValueNode* pSrc, SValueNode* pDst) {
}
static SNode* operatorNodeCopy(const SOperatorNode* pSrc, SOperatorNode* pDst) {
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_SCALAR_FIELD(opType);
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
CLONE_NODE_FIELD(pLeft);
CLONE_NODE_FIELD(pRight);
return (SNode*)pDst;
}
static SNode* logicConditionNodeCopy(const SLogicConditionNode* pSrc, SLogicConditionNode* pDst) {
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_SCALAR_FIELD(condType);
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
CLONE_NODE_LIST_FIELD(pParameterList);
return (SNode*)pDst;
}
static SNode* functionNodeCopy(const SFunctionNode* pSrc, SFunctionNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
exprNodeCopy((const SExprNode*)pSrc, (SExprNode*)pDst);
COPY_CHAR_ARRAY_FIELD(functionName);
COPY_SCALAR_FIELD(funcId);
COPY_SCALAR_FIELD(funcType);
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
CLONE_NODE_LIST_FIELD(pParameterList);
return (SNode*)pDst;
}
static SNode* tableNodeCopy(const STableNode* pSrc, STableNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, exprNodeCopy);
return (SNode*)pDst;
}
static STableMeta* tableMetaClone(const STableMeta* pSrc) {
int32_t len = TABLE_META_SIZE(pSrc);
STableMeta* pDst = taosMemoryMalloc(len);
if (NULL == pDst) {
return NULL;
}
memcpy(pDst, pSrc, len);
return pDst;
}
static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) {
int32_t len = VGROUPS_INFO_SIZE(pSrc);
SVgroupsInfo* pDst = taosMemoryMalloc(len);
if (NULL == pDst) {
return NULL;
}
memcpy(pDst, pSrc, len);
return pDst;
}
static SNode* realTableNodeCopy(const SRealTableNode* pSrc, SRealTableNode* pDst) {
COPY_BASE_OBJECT_FIELD(table, tableNodeCopy);
CLONE_OBJECT_FIELD(pMeta, tableMetaClone);
CLONE_OBJECT_FIELD(pVgroupList, vgroupsInfoClone);
return (SNode*)pDst;
}
static SNode* tempTableNodeCopy(const STempTableNode* pSrc, STempTableNode* pDst) {
COPY_BASE_OBJECT_FIELD(table, tableNodeCopy);
CLONE_NODE_FIELD(pSubquery);
return (SNode*)pDst;
}
static SNode* joinTableNodeCopy(const SJoinTableNode* pSrc, SJoinTableNode* pDst) {
COPY_BASE_OBJECT_FIELD(table, tableNodeCopy);
CLONE_NODE_FIELD(pLeft);
CLONE_NODE_FIELD(pRight);
CLONE_NODE_FIELD(pOnCond);
return (SNode*)pDst;
}
static SNode* targetNodeCopy(const STargetNode* pSrc, STargetNode* pDst) {
COPY_SCALAR_FIELD(dataBlockId);
COPY_SCALAR_FIELD(slotId);
CLONE_NODE_FIELD(pExpr);
return (SNode*)pDst;
}
static SNode* groupingSetNodeCopy(const SGroupingSetNode* pSrc, SGroupingSetNode* pDst) {
COPY_SCALAR_FIELD(groupingSetType);
CLONE_NODE_LIST_FIELD(pParameterList);
return (SNode*)pDst;
}
static SNode* orderByExprNodeCopy(const SOrderByExprNode* pSrc, SOrderByExprNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
CLONE_NODE_FIELD(pExpr);
return (SNode*)pDst;
}
static SNode* limitNodeCopy(const SLimitNode* pSrc, SLimitNode* pDst) { return (SNode*)pDst; }
static SNode* stateWindowNodeCopy(const SStateWindowNode* pSrc, SStateWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pExpr);
return (SNode*)pDst;
}
static SNode* sessionWindowNodeCopy(const SSessionWindowNode* pSrc, SSessionWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pGap);
return (SNode*)pDst;
}
static SNode* intervalWindowNodeCopy(const SIntervalWindowNode* pSrc, SIntervalWindowNode* pDst) {
CLONE_NODE_FIELD(pCol);
CLONE_NODE_FIELD(pInterval);
CLONE_NODE_FIELD(pOffset);
CLONE_NODE_FIELD(pSliding);
CLONE_NODE_FIELD(pFill);
return (SNode*)pDst;
}
static SNode* nodeListNodeCopy(const SNodeListNode* pSrc, SNodeListNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
CLONE_NODE_LIST_FIELD(pNodeList);
return (SNode*)pDst;
}
static SNode* fillNodeCopy(const SFillNode* pSrc, SFillNode* pDst) {
COPY_SCALAR_FIELD(mode);
CLONE_NODE_FIELD(pValues);
CLONE_NODE_FIELD(pWStartTs);
return (SNode*)pDst;
......@@ -199,31 +229,11 @@ static SNode* logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
CLONE_NODE_LIST_FIELD(pTargets);
CLONE_NODE_FIELD(pConditions);
CLONE_NODE_LIST_FIELD(pChildren);
pDst->pParent = NULL;
return (SNode*)pDst;
}
static STableMeta* tableMetaClone(const STableMeta* pSrc) {
int32_t len = TABLE_META_SIZE(pSrc);
STableMeta* pDst = taosMemoryMalloc(len);
if (NULL == pDst) {
return NULL;
}
memcpy(pDst, pSrc, len);
return pDst;
}
static SVgroupsInfo* vgroupsInfoClone(const SVgroupsInfo* pSrc) {
int32_t len = VGROUPS_INFO_SIZE(pSrc);
SVgroupsInfo* pDst = taosMemoryMalloc(len);
if (NULL == pDst) {
return NULL;
}
memcpy(pDst, pSrc, len);
return pDst;
}
static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pScanCols);
CLONE_NODE_LIST_FIELD(pScanPseudoCols);
......@@ -234,7 +244,6 @@ static SNode* logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
}
static SNode* logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_FIELD(pOnConditions);
return (SNode*)pDst;
......@@ -248,7 +257,6 @@ static SNode* logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) {
}
static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pProjections);
return (SNode*)pDst;
......@@ -256,18 +264,17 @@ static SNode* logicProjectCopy(const SProjectLogicNode* pSrc, SProjectLogicNode*
static SNode* logicVnodeModifCopy(const SVnodeModifLogicNode* pSrc, SVnodeModifLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(msgType);
pDst->pDataBlocks = NULL;
pDst->pVgDataBlocks = NULL;
return (SNode*)pDst;
}
static SNode* logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
COPY_SCALAR_FIELD(srcGroupId);
return (SNode*)pDst;
}
static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pFuncs);
CLONE_NODE_FIELD(pTspk);
......@@ -275,7 +282,6 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD
}
static SNode* logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_FIELD(pWStartTs);
CLONE_NODE_FIELD(pValues);
......@@ -296,29 +302,37 @@ static SNode* logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLogi
static SNode* logicSubplanCopy(const SLogicSubplan* pSrc, SLogicSubplan* pDst) {
CLONE_NODE_FIELD(pNode);
COPY_SCALAR_FIELD(subplanType);
pDst->pChildren = NULL;
pDst->pParents = NULL;
pDst->pVgroupList = NULL;
return (SNode*)pDst;
}
static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) {
COPY_ALL_SCALAR_FIELDS;
CLONE_NODE_LIST_FIELD(pSlots);
return (SNode*)pDst;
}
static SNode* slotDescCopy(const SSlotDescNode* pSrc, SSlotDescNode* pDst) {
COPY_SCALAR_FIELD(slotId);
dataTypeCopy(&pSrc->dataType, &pDst->dataType);
COPY_SCALAR_FIELD(reserve);
COPY_SCALAR_FIELD(output);
COPY_SCALAR_FIELD(tag);
return (SNode*)pDst;
}
static SNode* downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstreamSourceNode* pDst) {
COPY_SCALAR_FIELD(addr);
COPY_SCALAR_FIELD(taskId);
COPY_SCALAR_FIELD(schedId);
return (SNode*)pDst;
}
static SNode* selectStmtCopy(const SSelectStmt* pSrc, SSelectStmt* pDst) {
CLONE_NODE_LIST_FIELD(pProjectionList);
CLONE_NODE_FIELD(pFromTable);
CLONE_NODE_FIELD(pWhere);
CLONE_NODE_LIST_FIELD(pPartitionByList);
CLONE_NODE_FIELD(pWindow);
CLONE_NODE_LIST_FIELD(pGroupByList);
CLONE_NODE_FIELD(pHaving);
CLONE_NODE_LIST_FIELD(pOrderByList);
CLONE_NODE_FIELD(pLimit);
CLONE_NODE_FIELD(pLimit);
return (SNode*)pDst;
}
......@@ -331,6 +345,7 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memcpy(pDst, pNode, nodesNodeSize(nodeType(pNode)));
switch (nodeType(pNode)) {
case QUERY_NODE_COLUMN:
return columnNodeCopy((const SColumnNode*)pNode, (SColumnNode*)pDst);
......@@ -342,28 +357,38 @@ SNodeptr nodesCloneNode(const SNodeptr pNode) {
return logicConditionNodeCopy((const SLogicConditionNode*)pNode, (SLogicConditionNode*)pDst);
case QUERY_NODE_FUNCTION:
return functionNodeCopy((const SFunctionNode*)pNode, (SFunctionNode*)pDst);
case QUERY_NODE_TARGET:
return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst);
case QUERY_NODE_REAL_TABLE:
return realTableNodeCopy((const SRealTableNode*)pNode, (SRealTableNode*)pDst);
case QUERY_NODE_TEMP_TABLE:
return tempTableNodeCopy((const STempTableNode*)pNode, (STempTableNode*)pDst);
case QUERY_NODE_JOIN_TABLE:
break;
return joinTableNodeCopy((const SJoinTableNode*)pNode, (SJoinTableNode*)pDst);
case QUERY_NODE_GROUPING_SET:
return groupingSetNodeCopy((const SGroupingSetNode*)pNode, (SGroupingSetNode*)pDst);
case QUERY_NODE_ORDER_BY_EXPR:
return orderByExprNodeCopy((const SOrderByExprNode*)pNode, (SOrderByExprNode*)pDst);
case QUERY_NODE_LIMIT:
break;
return limitNodeCopy((const SLimitNode*)pNode, (SLimitNode*)pDst);
case QUERY_NODE_STATE_WINDOW:
return stateWindowNodeCopy((const SStateWindowNode*)pNode, (SStateWindowNode*)pDst);
case QUERY_NODE_SESSION_WINDOW:
return sessionWindowNodeCopy((const SSessionWindowNode*)pNode, (SSessionWindowNode*)pDst);
case QUERY_NODE_INTERVAL_WINDOW:
return intervalWindowNodeCopy((const SIntervalWindowNode*)pNode, (SIntervalWindowNode*)pDst);
case QUERY_NODE_NODE_LIST:
return nodeListNodeCopy((const SNodeListNode*)pNode, (SNodeListNode*)pDst);
case QUERY_NODE_FILL:
return fillNodeCopy((const SFillNode*)pNode, (SFillNode*)pDst);
case QUERY_NODE_TARGET:
return targetNodeCopy((const STargetNode*)pNode, (STargetNode*)pDst);
case QUERY_NODE_DATABLOCK_DESC:
return dataBlockDescCopy((const SDataBlockDescNode*)pNode, (SDataBlockDescNode*)pDst);
case QUERY_NODE_SLOT_DESC:
return slotDescCopy((const SSlotDescNode*)pNode, (SSlotDescNode*)pDst);
case QUERY_NODE_DOWNSTREAM_SOURCE:
return downstreamSourceCopy((const SDownstreamSourceNode*)pNode, (SDownstreamSourceNode*)pDst);
case QUERY_NODE_SELECT_STMT:
return selectStmtCopy((const SSelectStmt*)pNode, (SSelectStmt*)pDst);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return logicScanCopy((const SScanLogicNode*)pNode, (SScanLogicNode*)pDst);
case QUERY_NODE_LOGIC_PLAN_JOIN:
......
......@@ -20,13 +20,28 @@
if (a->fldname != b->fldname) return false; \
} while (0)
#define COMPARE_STRING(a, b) (((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b))
#define COMPARE_STRING(a, b) (((a) != NULL && (b) != NULL) ? (strcmp((a), (b)) == 0) : (a) == (b))
#define COMPARE_VARDATA(a, b) \
(((a) != NULL && (b) != NULL) \
? (varDataLen((a)) == varDataLen((b)) && memcmp(varDataVal((a)), varDataVal((b)), varDataLen((a))) == 0) \
: (a) == (b))
#define COMPARE_STRING_FIELD(fldname) \
do { \
if (!COMPARE_STRING(a->fldname, b->fldname)) return false; \
} while (0)
#define COMPARE_VARDATA_FIELD(fldname) \
do { \
if (!COMPARE_VARDATA(a->fldname, b->fldname)) return false; \
} while (0)
#define COMPARE_OBJECT_FIELD(fldname, equalFunc) \
do { \
if (!equalFunc(a->fldname, b->fldname)) return false; \
} while (0)
#define COMPARE_NODE_FIELD(fldname) \
do { \
if (!nodesEqualNode(a->fldname, b->fldname)) return false; \
......@@ -59,6 +74,10 @@ static bool nodeNodeListEqual(const SNodeList* a, const SNodeList* b) {
return true;
}
static bool dataTypeEqual(SDataType a, SDataType b) {
return a.type == b.type && a.bytes == b.bytes && a.precision == b.precision && a.scale == b.scale;
}
static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) {
COMPARE_STRING_FIELD(dbName);
COMPARE_STRING_FIELD(tableName);
......@@ -67,7 +86,35 @@ static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) {
}
static bool valueNodeEqual(const SValueNode* a, const SValueNode* b) {
COMPARE_OBJECT_FIELD(node.resType, dataTypeEqual);
COMPARE_STRING_FIELD(literal);
switch (a->node.resType.type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
COMPARE_SCALAR_FIELD(typeData);
break;
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_NCHAR:
COMPARE_VARDATA_FIELD(datum.p);
break;
case TSDB_DATA_TYPE_JSON:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
return false;
default:
break;
}
return true;
}
......
......@@ -21,156 +21,147 @@
#include "taoserror.h"
#include "thash.h"
static SNode* makeNode(ENodeType type, size_t size) {
SNode* p = taosMemoryCalloc(1, size);
if (NULL == p) {
return NULL;
}
setNodeType(p, type);
return p;
}
SNodeptr nodesMakeNode(ENodeType type) {
int32_t nodesNodeSize(ENodeType type) {
switch (type) {
case QUERY_NODE_COLUMN:
return makeNode(type, sizeof(SColumnNode));
return sizeof(SColumnNode);
case QUERY_NODE_VALUE:
return makeNode(type, sizeof(SValueNode));
return sizeof(SValueNode);
case QUERY_NODE_OPERATOR:
return makeNode(type, sizeof(SOperatorNode));
return sizeof(SOperatorNode);
case QUERY_NODE_LOGIC_CONDITION:
return makeNode(type, sizeof(SLogicConditionNode));
return sizeof(SLogicConditionNode);
case QUERY_NODE_FUNCTION:
return makeNode(type, sizeof(SFunctionNode));
return sizeof(SFunctionNode);
case QUERY_NODE_REAL_TABLE:
return makeNode(type, sizeof(SRealTableNode));
return sizeof(SRealTableNode);
case QUERY_NODE_TEMP_TABLE:
return makeNode(type, sizeof(STempTableNode));
return sizeof(STempTableNode);
case QUERY_NODE_JOIN_TABLE:
return makeNode(type, sizeof(SJoinTableNode));
return sizeof(SJoinTableNode);
case QUERY_NODE_GROUPING_SET:
return makeNode(type, sizeof(SGroupingSetNode));
return sizeof(SGroupingSetNode);
case QUERY_NODE_ORDER_BY_EXPR:
return makeNode(type, sizeof(SOrderByExprNode));
return sizeof(SOrderByExprNode);
case QUERY_NODE_LIMIT:
return makeNode(type, sizeof(SLimitNode));
return sizeof(SLimitNode);
case QUERY_NODE_STATE_WINDOW:
return makeNode(type, sizeof(SStateWindowNode));
return sizeof(SStateWindowNode);
case QUERY_NODE_SESSION_WINDOW:
return makeNode(type, sizeof(SSessionWindowNode));
return sizeof(SSessionWindowNode);
case QUERY_NODE_INTERVAL_WINDOW:
return makeNode(type, sizeof(SIntervalWindowNode));
return sizeof(SIntervalWindowNode);
case QUERY_NODE_NODE_LIST:
return makeNode(type, sizeof(SNodeListNode));
return sizeof(SNodeListNode);
case QUERY_NODE_FILL:
return makeNode(type, sizeof(SFillNode));
return sizeof(SFillNode);
case QUERY_NODE_RAW_EXPR:
return makeNode(type, sizeof(SRawExprNode));
return sizeof(SRawExprNode);
case QUERY_NODE_TARGET:
return makeNode(type, sizeof(STargetNode));
return sizeof(STargetNode);
case QUERY_NODE_DATABLOCK_DESC:
return makeNode(type, sizeof(SDataBlockDescNode));
return sizeof(SDataBlockDescNode);
case QUERY_NODE_SLOT_DESC:
return makeNode(type, sizeof(SSlotDescNode));
return sizeof(SSlotDescNode);
case QUERY_NODE_COLUMN_DEF:
return makeNode(type, sizeof(SColumnDefNode));
return sizeof(SColumnDefNode);
case QUERY_NODE_DOWNSTREAM_SOURCE:
return makeNode(type, sizeof(SDownstreamSourceNode));
return sizeof(SDownstreamSourceNode);
case QUERY_NODE_DATABASE_OPTIONS:
return makeNode(type, sizeof(SDatabaseOptions));
return sizeof(SDatabaseOptions);
case QUERY_NODE_TABLE_OPTIONS:
return makeNode(type, sizeof(STableOptions));
return sizeof(STableOptions);
case QUERY_NODE_INDEX_OPTIONS:
return makeNode(type, sizeof(SIndexOptions));
return sizeof(SIndexOptions);
case QUERY_NODE_EXPLAIN_OPTIONS:
return makeNode(type, sizeof(SExplainOptions));
return sizeof(SExplainOptions);
case QUERY_NODE_STREAM_OPTIONS:
return makeNode(type, sizeof(SStreamOptions));
return sizeof(SStreamOptions);
case QUERY_NODE_TOPIC_OPTIONS:
return makeNode(type, sizeof(STopicOptions));
return sizeof(STopicOptions);
case QUERY_NODE_SET_OPERATOR:
return makeNode(type, sizeof(SSetOperator));
return sizeof(SSetOperator);
case QUERY_NODE_SELECT_STMT:
return makeNode(type, sizeof(SSelectStmt));
return sizeof(SSelectStmt);
case QUERY_NODE_VNODE_MODIF_STMT:
return makeNode(type, sizeof(SVnodeModifOpStmt));
return sizeof(SVnodeModifOpStmt);
case QUERY_NODE_CREATE_DATABASE_STMT:
return makeNode(type, sizeof(SCreateDatabaseStmt));
return sizeof(SCreateDatabaseStmt);
case QUERY_NODE_DROP_DATABASE_STMT:
return makeNode(type, sizeof(SDropDatabaseStmt));
return sizeof(SDropDatabaseStmt);
case QUERY_NODE_ALTER_DATABASE_STMT:
return makeNode(type, sizeof(SAlterDatabaseStmt));
return sizeof(SAlterDatabaseStmt);
case QUERY_NODE_CREATE_TABLE_STMT:
return makeNode(type, sizeof(SCreateTableStmt));
return sizeof(SCreateTableStmt);
case QUERY_NODE_CREATE_SUBTABLE_CLAUSE:
return makeNode(type, sizeof(SCreateSubTableClause));
return sizeof(SCreateSubTableClause);
case QUERY_NODE_CREATE_MULTI_TABLE_STMT:
return makeNode(type, sizeof(SCreateMultiTableStmt));
return sizeof(SCreateMultiTableStmt);
case QUERY_NODE_DROP_TABLE_CLAUSE:
return makeNode(type, sizeof(SDropTableClause));
return sizeof(SDropTableClause);
case QUERY_NODE_DROP_TABLE_STMT:
return makeNode(type, sizeof(SDropTableStmt));
return sizeof(SDropTableStmt);
case QUERY_NODE_DROP_SUPER_TABLE_STMT:
return makeNode(type, sizeof(SDropSuperTableStmt));
return sizeof(SDropSuperTableStmt);
case QUERY_NODE_ALTER_TABLE_STMT:
return makeNode(type, sizeof(SAlterTableStmt));
return sizeof(SAlterTableStmt);
case QUERY_NODE_CREATE_USER_STMT:
return makeNode(type, sizeof(SCreateUserStmt));
return sizeof(SCreateUserStmt);
case QUERY_NODE_ALTER_USER_STMT:
return makeNode(type, sizeof(SAlterUserStmt));
return sizeof(SAlterUserStmt);
case QUERY_NODE_DROP_USER_STMT:
return makeNode(type, sizeof(SDropUserStmt));
return sizeof(SDropUserStmt);
case QUERY_NODE_USE_DATABASE_STMT:
return makeNode(type, sizeof(SUseDatabaseStmt));
return sizeof(SUseDatabaseStmt);
case QUERY_NODE_CREATE_DNODE_STMT:
return makeNode(type, sizeof(SCreateDnodeStmt));
return sizeof(SCreateDnodeStmt);
case QUERY_NODE_DROP_DNODE_STMT:
return makeNode(type, sizeof(SDropDnodeStmt));
return sizeof(SDropDnodeStmt);
case QUERY_NODE_ALTER_DNODE_STMT:
return makeNode(type, sizeof(SAlterDnodeStmt));
return sizeof(SAlterDnodeStmt);
case QUERY_NODE_CREATE_INDEX_STMT:
return makeNode(type, sizeof(SCreateIndexStmt));
return sizeof(SCreateIndexStmt);
case QUERY_NODE_DROP_INDEX_STMT:
return makeNode(type, sizeof(SDropIndexStmt));
return sizeof(SDropIndexStmt);
case QUERY_NODE_CREATE_QNODE_STMT:
case QUERY_NODE_CREATE_BNODE_STMT:
case QUERY_NODE_CREATE_SNODE_STMT:
case QUERY_NODE_CREATE_MNODE_STMT:
return makeNode(type, sizeof(SCreateComponentNodeStmt));
return sizeof(SCreateComponentNodeStmt);
case QUERY_NODE_DROP_QNODE_STMT:
case QUERY_NODE_DROP_BNODE_STMT:
case QUERY_NODE_DROP_SNODE_STMT:
case QUERY_NODE_DROP_MNODE_STMT:
return makeNode(type, sizeof(SDropComponentNodeStmt));
return sizeof(SDropComponentNodeStmt);
case QUERY_NODE_CREATE_TOPIC_STMT:
return makeNode(type, sizeof(SCreateTopicStmt));
return sizeof(SCreateTopicStmt);
case QUERY_NODE_DROP_TOPIC_STMT:
return makeNode(type, sizeof(SDropTopicStmt));
return sizeof(SDropTopicStmt);
case QUERY_NODE_EXPLAIN_STMT:
return makeNode(type, sizeof(SExplainStmt));
return sizeof(SExplainStmt);
case QUERY_NODE_DESCRIBE_STMT:
return makeNode(type, sizeof(SDescribeStmt));
return sizeof(SDescribeStmt);
case QUERY_NODE_RESET_QUERY_CACHE_STMT:
return makeNode(type, sizeof(SNode));
return sizeof(SNode);
case QUERY_NODE_COMPACT_STMT:
break;
case QUERY_NODE_CREATE_FUNCTION_STMT:
return makeNode(type, sizeof(SCreateFunctionStmt));
return sizeof(SCreateFunctionStmt);
case QUERY_NODE_DROP_FUNCTION_STMT:
return makeNode(type, sizeof(SDropFunctionStmt));
return sizeof(SDropFunctionStmt);
case QUERY_NODE_CREATE_STREAM_STMT:
return makeNode(type, sizeof(SCreateStreamStmt));
return sizeof(SCreateStreamStmt);
case QUERY_NODE_DROP_STREAM_STMT:
return makeNode(type, sizeof(SDropStreamStmt));
return sizeof(SDropStreamStmt);
case QUERY_NODE_MERGE_VGROUP_STMT:
case QUERY_NODE_REDISTRIBUTE_VGROUP_STMT:
case QUERY_NODE_SPLIT_VGROUP_STMT:
case QUERY_NODE_SYNCDB_STMT:
break;
case QUERY_NODE_GRANT_STMT:
return makeNode(type, sizeof(SGrantStmt));
return sizeof(SGrantStmt);
case QUERY_NODE_REVOKE_STMT:
return makeNode(type, sizeof(SRevokeStmt));
return sizeof(SRevokeStmt);
case QUERY_NODE_SHOW_DNODES_STMT:
case QUERY_NODE_SHOW_MNODES_STMT:
case QUERY_NODE_SHOW_MODULES_STMT:
......@@ -201,80 +192,89 @@ SNodeptr nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_CREATE_TABLE_STMT:
case QUERY_NODE_SHOW_CREATE_STABLE_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
return makeNode(type, sizeof(SShowStmt));
return sizeof(SShowStmt);
case QUERY_NODE_KILL_CONNECTION_STMT:
case QUERY_NODE_KILL_QUERY_STMT:
case QUERY_NODE_KILL_TRANSACTION_STMT:
return makeNode(type, sizeof(SKillStmt));
return sizeof(SKillStmt);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return makeNode(type, sizeof(SScanLogicNode));
return sizeof(SScanLogicNode);
case QUERY_NODE_LOGIC_PLAN_JOIN:
return makeNode(type, sizeof(SJoinLogicNode));
return sizeof(SJoinLogicNode);
case QUERY_NODE_LOGIC_PLAN_AGG:
return makeNode(type, sizeof(SAggLogicNode));
return sizeof(SAggLogicNode);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectLogicNode));
return sizeof(SProjectLogicNode);
case QUERY_NODE_LOGIC_PLAN_VNODE_MODIF:
return makeNode(type, sizeof(SVnodeModifLogicNode));
return sizeof(SVnodeModifLogicNode);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangeLogicNode));
return sizeof(SExchangeLogicNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW:
return makeNode(type, sizeof(SWindowLogicNode));
return sizeof(SWindowLogicNode);
case QUERY_NODE_LOGIC_PLAN_FILL:
return makeNode(type, sizeof(SFillLogicNode));
return sizeof(SFillLogicNode);
case QUERY_NODE_LOGIC_PLAN_SORT:
return makeNode(type, sizeof(SSortLogicNode));
return sizeof(SSortLogicNode);
case QUERY_NODE_LOGIC_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionLogicNode));
return sizeof(SPartitionLogicNode);
case QUERY_NODE_LOGIC_SUBPLAN:
return makeNode(type, sizeof(SLogicSubplan));
return sizeof(SLogicSubplan);
case QUERY_NODE_LOGIC_PLAN:
return makeNode(type, sizeof(SQueryLogicPlan));
return sizeof(SQueryLogicPlan);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return makeNode(type, sizeof(STagScanPhysiNode));
return sizeof(STagScanPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN:
return makeNode(type, sizeof(STableScanPhysiNode));
return sizeof(STableScanPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN:
return makeNode(type, sizeof(STableSeqScanPhysiNode));
return sizeof(STableSeqScanPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN:
return makeNode(type, sizeof(SStreamScanPhysiNode));
return sizeof(SStreamScanPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN:
return makeNode(type, sizeof(SSystemTableScanPhysiNode));
return sizeof(SSystemTableScanPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_PROJECT:
return makeNode(type, sizeof(SProjectPhysiNode));
return sizeof(SProjectPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_JOIN:
return makeNode(type, sizeof(SJoinPhysiNode));
return sizeof(SJoinPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_AGG:
return makeNode(type, sizeof(SAggPhysiNode));
return sizeof(SAggPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE:
return makeNode(type, sizeof(SExchangePhysiNode));
return sizeof(SExchangePhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_SORT:
return makeNode(type, sizeof(SSortPhysiNode));
return sizeof(SSortPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_INTERVAL:
return makeNode(type, sizeof(SIntervalPhysiNode));
return sizeof(SIntervalPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL:
return makeNode(type, sizeof(SStreamIntervalPhysiNode));
return sizeof(SStreamIntervalPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_FILL:
return makeNode(type, sizeof(SFillPhysiNode));
return sizeof(SFillPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_SESSION_WINDOW:
return makeNode(type, sizeof(SSessionWinodwPhysiNode));
return sizeof(SSessionWinodwPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_STATE_WINDOW:
return makeNode(type, sizeof(SStateWinodwPhysiNode));
return sizeof(SStateWinodwPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_PARTITION:
return makeNode(type, sizeof(SPartitionPhysiNode));
return sizeof(SPartitionPhysiNode);
case QUERY_NODE_PHYSICAL_PLAN_DISPATCH:
return makeNode(type, sizeof(SDataDispatcherNode));
return sizeof(SDataDispatcherNode);
case QUERY_NODE_PHYSICAL_PLAN_INSERT:
return makeNode(type, sizeof(SDataInserterNode));
return sizeof(SDataInserterNode);
case QUERY_NODE_PHYSICAL_SUBPLAN:
return makeNode(type, sizeof(SSubplan));
return sizeof(SSubplan);
case QUERY_NODE_PHYSICAL_PLAN:
return makeNode(type, sizeof(SQueryPlan));
return sizeof(SQueryPlan);
default:
break;
}
nodesError("nodesMakeNode unknown node = %s", nodesNodeName(type));
return NULL;
return 0;
}
SNodeptr nodesMakeNode(ENodeType type) {
SNode* p = taosMemoryCalloc(1, nodesNodeSize(type));
if (NULL == p) {
return NULL;
}
setNodeType(p, type);
return p;
}
static void destroyVgDataBlockArray(SArray* pArray) {
......
......@@ -559,6 +559,8 @@ SNode* setProjectionAlias(SAstCreateContext* pCxt, SNode* pNode, const SToken* p
int32_t len = TMIN(sizeof(((SExprNode*)pNode)->aliasName) - 1, pAlias->n);
strncpy(((SExprNode*)pNode)->aliasName, pAlias->z, len);
((SExprNode*)pNode)->aliasName[len] = '\0';
strncpy(((SExprNode*)pNode)->userAlias, pAlias->z, len);
((SExprNode*)pNode)->userAlias[len] = '\0';
return pNode;
}
......
......@@ -3446,7 +3446,11 @@ static int32_t extractQueryResultSchema(const SNodeList* pProjections, int32_t*
(*pSchema)[index].type = pExpr->resType.type;
(*pSchema)[index].bytes = pExpr->resType.bytes;
(*pSchema)[index].colId = index + 1;
strcpy((*pSchema)[index].name, pExpr->aliasName);
if ('\0' != pExpr->userAlias[0]) {
strcpy((*pSchema)[index].name, pExpr->userAlias);
} else {
strcpy((*pSchema)[index].name, pExpr->aliasName);
}
index += 1;
}
......
......@@ -41,7 +41,7 @@ static int32_t parseSqlIntoAst(SParseContext* pCxt, SQuery** pQuery) {
}
if (TSDB_CODE_SUCCESS == code && (*pQuery)->placeholderNum > 0) {
// TSWAP((*pQuery)->pContainPlaceholderRoot, (*pQuery)->pRoot);
TSWAP((*pQuery)->pPrepareRoot, (*pQuery)->pRoot);
return TSDB_CODE_SUCCESS;
}
......@@ -137,6 +137,36 @@ static int32_t setValueByBindParam(SValueNode* pVal, TAOS_MULTI_BIND* pParam) {
return TSDB_CODE_SUCCESS;
}
static EDealRes rewriteQueryExprAliasImpl(SNode* pNode, void* pContext) {
if (nodesIsExprNode(pNode) && QUERY_NODE_COLUMN != nodeType(pNode) && '\0' == ((SExprNode*)pNode)->userAlias[0]) {
strcpy(((SExprNode*)pNode)->userAlias, ((SExprNode*)pNode)->aliasName);
sprintf(((SExprNode*)pNode)->aliasName, "#%d", *(int32_t*)pContext);
++(*(int32_t*)pContext);
}
return DEAL_RES_CONTINUE;
}
static void rewriteQueryExprAlias(SNode* pRoot, int32_t* pNo) {
switch (nodeType(pRoot)) {
case QUERY_NODE_SELECT_STMT:
nodesWalkSelectStmt((SSelectStmt*)pRoot, SQL_CLAUSE_FROM, rewriteQueryExprAliasImpl, pNo);
break;
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pSetOper = (SSetOperator*)pRoot;
rewriteQueryExprAlias(pSetOper->pLeft, pNo);
rewriteQueryExprAlias(pSetOper->pRight, pNo);
break;
}
default:
break;
}
}
static void rewriteExprAlias(SNode* pRoot) {
int32_t no = 1;
rewriteQueryExprAlias(pRoot, &no);
}
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery) {
int32_t code = TSDB_CODE_SUCCESS;
if (isInsertSql(pCxt->pSql, pCxt->sqlLen)) {
......@@ -169,6 +199,15 @@ int32_t qStmtBindParams(SQuery* pQuery, TAOS_MULTI_BIND* pParams, int32_t colIdx
code = setValueByBindParam((SValueNode*)taosArrayGetP(pQuery->pPlaceholderValues, colIdx), pParams);
}
if (TSDB_CODE_SUCCESS == code && (colIdx < 0 || colIdx + 1 == pQuery->placeholderNum)) {
pQuery->pRoot = nodesCloneNode(pQuery->pPrepareRoot);
if (NULL == pQuery->pRoot) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (TSDB_CODE_SUCCESS == code) {
rewriteExprAlias(pQuery->pRoot);
}
return code;
}
......
......@@ -20,49 +20,147 @@ using namespace std;
class PlanStmtTest : public PlannerTestBase {
public:
void buildParam(TAOS_MULTI_BIND* pBindParams, int32_t index, void* pVal, int32_t type, int32_t bytes = 0) {
TAOS_MULTI_BIND* pBindParam = pBindParams + index;
pBindParam->buffer_type = type;
pBindParam->num = 1;
pBindParam->buffer_length = bytes > 0 ? bytes : tDataTypes[type].bytes;
pBindParam->buffer = taosMemoryCalloc(1, pBindParam->buffer_length);
pBindParam->length = (int32_t*)taosMemoryCalloc(1, sizeof(int32_t));
pBindParam->is_null = (char*)taosMemoryCalloc(1, sizeof(char));
*(pBindParam->length) = bytes > 0 ? bytes : tDataTypes[type].bytes;
*(pBindParam->is_null) = 0;
TAOS_MULTI_BIND* createBindParams(int32_t nParams) {
return (TAOS_MULTI_BIND*)taosMemoryCalloc(nParams, sizeof(TAOS_MULTI_BIND));
}
TAOS_MULTI_BIND* buildIntegerParam(TAOS_MULTI_BIND* pBindParams, int32_t index, int64_t val, int32_t type) {
TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, 0);
switch (type) {
case TSDB_DATA_TYPE_BOOL:
*((bool*)pBindParam->buffer) = *(bool*)pVal;
*((bool*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_TINYINT:
*((int8_t*)pBindParam->buffer) = *(int64_t*)pVal;
*((int8_t*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_SMALLINT:
*((int16_t*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_INT:
*((int32_t*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_FLOAT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_VARCHAR:
*((int64_t*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_NCHAR:
*((int64_t*)pBindParam->buffer) = val;
break;
default:
break;
}
return pBindParam;
}
TAOS_MULTI_BIND* buildUIntegerParam(TAOS_MULTI_BIND* pBindParams, int32_t index, uint64_t val, int32_t type) {
TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, 0);
switch (type) {
case TSDB_DATA_TYPE_UTINYINT:
*((uint8_t*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_USMALLINT:
*((uint16_t*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_UINT:
*((uint32_t*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_JSON:
*((uint64_t*)pBindParam->buffer) = val;
break;
default:
break;
}
return pBindParam;
}
TAOS_MULTI_BIND* buildDoubleParam(TAOS_MULTI_BIND* pBindParams, int32_t index, double val, int32_t type) {
TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, 0);
switch (type) {
case TSDB_DATA_TYPE_FLOAT:
*((float*)pBindParam->buffer) = val;
break;
case TSDB_DATA_TYPE_DOUBLE:
*((double*)pBindParam->buffer) = val;
break;
default:
break;
}
return pBindParam;
}
TAOS_MULTI_BIND* buildStringParam(TAOS_MULTI_BIND* pBindParams, int32_t index, const char* pVal, int32_t type,
int32_t bytes) {
TAOS_MULTI_BIND* pBindParam = initParam(pBindParams, index, type, bytes);
switch (type) {
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_VARBINARY:
case TSDB_DATA_TYPE_DECIMAL:
case TSDB_DATA_TYPE_BLOB:
case TSDB_DATA_TYPE_MEDIUMBLOB:
strncpy((char*)pBindParam->buffer, pVal, bytes);
break;
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_NCHAR:
default:
break;
}
return pBindParam;
}
private:
TAOS_MULTI_BIND* initParam(TAOS_MULTI_BIND* pBindParams, int32_t index, int32_t type, int32_t bytes) {
TAOS_MULTI_BIND* pBindParam = pBindParams + index;
pBindParam->buffer_type = type;
pBindParam->num = 1;
pBindParam->buffer_length = bytes > 0 ? bytes : tDataTypes[type].bytes;
pBindParam->buffer = taosMemoryCalloc(1, pBindParam->buffer_length);
pBindParam->length = (int32_t*)taosMemoryCalloc(1, sizeof(int32_t));
pBindParam->is_null = (char*)taosMemoryCalloc(1, sizeof(char));
*(pBindParam->length) = bytes > 0 ? bytes : tDataTypes[type].bytes;
*(pBindParam->is_null) = 0;
return pBindParam;
}
};
TEST_F(PlanStmtTest, stmt) {
TEST_F(PlanStmtTest, basic) {
useDb("root", "test");
prepare("SELECT * FROM t1 WHERE c1 = ?");
bindParams(buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT), 0);
exec();
{
prepare("SELECT * FROM t1 WHERE c1 = ? AND c2 = ?");
TAOS_MULTI_BIND* pBindParams = createBindParams(2);
buildIntegerParam(pBindParams, 0, 10, TSDB_DATA_TYPE_INT);
buildStringParam(pBindParams, 1, "abc", TSDB_DATA_TYPE_VARCHAR, strlen("abc"));
bindParams(pBindParams, -1);
exec();
taosMemoryFreeClear(pBindParams);
}
{
prepare("SELECT MAX(?), MAX(?) FROM t1");
TAOS_MULTI_BIND* pBindParams = createBindParams(2);
buildIntegerParam(pBindParams, 0, 10, TSDB_DATA_TYPE_TINYINT);
buildIntegerParam(pBindParams, 1, 20, TSDB_DATA_TYPE_INT);
bindParams(pBindParams, -1);
exec();
taosMemoryFreeClear(pBindParams);
}
}
TEST_F(PlanStmtTest, multiExec) {
useDb("root", "test");
prepare("SELECT * FROM t1 WHERE c1 = ?");
bindParams(buildIntegerParam(createBindParams(1), 0, 10, TSDB_DATA_TYPE_INT), 0);
exec();
bindParams(buildIntegerParam(createBindParams(1), 0, 20, TSDB_DATA_TYPE_INT), 0);
exec();
bindParams(buildIntegerParam(createBindParams(1), 0, 30, TSDB_DATA_TYPE_INT), 0);
exec();
}
TEST_F(PlanStmtTest, allDataType) { useDb("root", "test"); }
......@@ -112,8 +112,6 @@ class PlannerTestBaseImpl {
reset();
try {
doParseSql(sql, &stmtEnv_.pQuery_, true);
dump(g_dumpModule);
} catch (...) {
dump(DUMP_MODULE_ALL);
throw;
......@@ -123,6 +121,15 @@ class PlannerTestBaseImpl {
void bindParams(TAOS_MULTI_BIND* pParams, int32_t colIdx) {
try {
doBindParams(stmtEnv_.pQuery_, pParams, colIdx);
} catch (...) {
dump(DUMP_MODULE_ALL);
throw;
}
}
void exec() {
try {
doParseBoundSql(stmtEnv_.pQuery_);
SPlanContext cxt = {0};
setPlanContext(stmtEnv_.pQuery_, &cxt);
......@@ -148,17 +155,6 @@ class PlannerTestBaseImpl {
}
}
void exec() {
try {
doParseBoundSql(stmtEnv_.pQuery_);
dump(g_dumpModule);
} catch (...) {
dump(DUMP_MODULE_ALL);
throw;
}
}
private:
struct caseEnv {
string acctId_;
......@@ -208,11 +204,17 @@ class PlannerTestBaseImpl {
cout << "==========================================sql : [" << stmtEnv_.sql_ << "]" << endl;
if (DUMP_MODULE_ALL == module || DUMP_MODULE_PARSER == module) {
cout << "syntax tree : " << endl;
cout << res_.ast_ << endl;
cout << "bound syntax tree : " << endl;
cout << res_.boundAst_ << endl;
if (res_.prepareAst_.empty()) {
cout << "syntax tree : " << endl;
cout << res_.ast_ << endl;
} else {
cout << "prepare syntax tree : " << endl;
cout << res_.prepareAst_ << endl;
cout << "bound syntax tree : " << endl;
cout << res_.boundAst_ << endl;
cout << "syntax tree : " << endl;
cout << res_.ast_ << endl;
}
}
if (DUMP_MODULE_ALL == module || DUMP_MODULE_LOGIC == module) {
......@@ -262,7 +264,7 @@ class PlannerTestBaseImpl {
DO_WITH_THROW(qParseSql, &cxt, pQuery);
if (prepare) {
res_.prepareAst_ = toString((*pQuery)->pRoot);
res_.prepareAst_ = toString((*pQuery)->pPrepareRoot);
} else {
res_.ast_ = toString((*pQuery)->pRoot);
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册