提交 a98d86d4 编写于 作者: H Hongze Cheng

Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/alter_table

...@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t; ...@@ -70,7 +70,7 @@ typedef uint16_t tmsg_t;
#define TSDB_IE_TYPE_DNODE_EXT 6 #define TSDB_IE_TYPE_DNODE_EXT 6
#define TSDB_IE_TYPE_DNODE_STATE 7 #define TSDB_IE_TYPE_DNODE_STATE 7
enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__MAX }; enum { CONN_TYPE__QUERY = 1, CONN_TYPE__TMQ, CONN_TYPE__UDFD, CONN_TYPE__MAX };
enum { enum {
HEARTBEAT_KEY_USER_AUTHINFO = 1, HEARTBEAT_KEY_USER_AUTHINFO = 1,
......
...@@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx); ...@@ -105,7 +105,12 @@ int32_t mavgFunction(SqlFunctionCtx* pCtx);
bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSampleFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo); bool sampleFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t sampleFunction(SqlFunctionCtx* pCtx); int32_t sampleFunction(SqlFunctionCtx* pCtx);
int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); //int32_t sampleFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getTailFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
int32_t tailFunction(SqlFunctionCtx* pCtx);
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv); bool getSelectivityFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv);
......
...@@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len) ...@@ -386,6 +386,35 @@ static int32_t translateSample(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t translateTail(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
if (2 != numOfParams && 3 != numOfParams) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
SNode* pPara = nodesListGetNode(pFunc->pParameterList, 0);
if (QUERY_NODE_COLUMN != nodeType(pPara)) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"The input parameter of TAIL function can only be column");
}
for (int32_t i = 1; i < numOfParams; ++i) {
uint8_t paraType = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, i))->resType.type;
if (!IS_INTEGER_TYPE(paraType)) {
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
}
}
SExprNode* pCol = (SExprNode*)nodesListGetNode(pFunc->pParameterList, 0);
uint8_t colType = pCol->resType.type;
if (IS_VAR_DATA_TYPE(colType)) {
pFunc->node.resType = (SDataType){.bytes = pCol->resType.bytes, .type = colType};
} else {
pFunc->node.resType = (SDataType){.bytes = tDataTypes[colType].bytes, .type = colType};
}
return TSDB_CODE_SUCCESS;
}
static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { static int32_t translateLastRow(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
// todo // todo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { ...@@ -850,6 +879,16 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.processFunc = sampleFunction, .processFunc = sampleFunction,
.finalizeFunc = NULL .finalizeFunc = NULL
}, },
{
.name = "tail",
.type = FUNCTION_TYPE_TAIL,
.classification = FUNC_MGT_NONSTANDARD_SQL_FUNC | FUNC_MGT_TIMELINE_FUNC,
.translateFunc = translateTail,
.getEnvFunc = getTailFuncEnv,
.initFunc = tailFunctionSetup,
.processFunc = tailFunction,
.finalizeFunc = tailFinalize
},
{ {
.name = "abs", .name = "abs",
.type = FUNCTION_TYPE_ABS, .type = FUNCTION_TYPE_ABS,
......
...@@ -18,12 +18,15 @@ ...@@ -18,12 +18,15 @@
#include "function.h" #include "function.h"
#include "querynodes.h" #include "querynodes.h"
#include "taggfunction.h" #include "taggfunction.h"
#include "tcompare.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tpercentile.h" #include "tpercentile.h"
#define HISTOGRAM_MAX_BINS_NUM 1000 #define HISTOGRAM_MAX_BINS_NUM 1000
#define MAVG_MAX_POINTS_NUM 1000 #define MAVG_MAX_POINTS_NUM 1000
#define SAMPLE_MAX_POINTS_NUM 1000 #define SAMPLE_MAX_POINTS_NUM 1000
#define TAIL_MAX_POINTS_NUM 100
#define TAIL_MAX_OFFSET 100
typedef struct SSumRes { typedef struct SSumRes {
union { union {
...@@ -161,6 +164,21 @@ typedef struct SSampleInfo { ...@@ -161,6 +164,21 @@ typedef struct SSampleInfo {
int64_t *timestamp; int64_t *timestamp;
} SSampleInfo; } SSampleInfo;
typedef struct STailItem {
int64_t timestamp;
bool isNull;
char data[];
} STailItem;
typedef struct STailInfo {
int32_t numOfPoints;
int32_t numAdded;
int32_t offset;
uint8_t colType;
int16_t colBytes;
STailItem **pItems;
} STailInfo;
#define SET_VAL(_info, numOfElem, res) \ #define SET_VAL(_info, numOfElem, res) \
do { \ do { \
if ((numOfElem) <= 0) { \ if ((numOfElem) <= 0) { \
...@@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -3107,7 +3125,7 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
int32_t startOffset = pCtx->offset; int32_t startOffset = pCtx->offset;
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) { for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex; i += 1) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_s(pInputCol, i)) {
//colDataAppendNULL(pOutput, i); //colDataAppendNULL(pOutput, i);
continue; continue;
} }
...@@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) { ...@@ -3141,3 +3159,132 @@ int32_t sampleFunction(SqlFunctionCtx* pCtx) {
// //
// return pResInfo->numOfRes; // return pResInfo->numOfRes;
//} //}
bool getTailFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
SColumnNode* pCol = (SColumnNode*)nodesListGetNode(pFunc->pParameterList, 0);
SValueNode* pVal = (SValueNode*)nodesListGetNode(pFunc->pParameterList, 1);
int32_t numOfPoints = pVal->datum.i;
pEnv->calcMemSize = sizeof(STailInfo) + numOfPoints * (POINTER_BYTES + sizeof(STailItem) + pCol->node.resType.bytes);
return true;
}
bool tailFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo *pResultInfo) {
if (!functionSetup(pCtx, pResultInfo)) {
return false;
}
STailInfo *pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
pInfo->numAdded = 0;
pInfo->numOfPoints = pCtx->param[1].param.i;
if (pCtx->numOfParams == 4) {
pInfo->offset = pCtx->param[2].param.i;
} else {
pInfo->offset = 0;
}
pInfo->colType = pCtx->resDataInfo.type;
pInfo->colBytes = pCtx->resDataInfo.bytes;
if ((pInfo->numOfPoints < 1 || pInfo->numOfPoints > TAIL_MAX_POINTS_NUM) ||
(pInfo->numOfPoints < 0 || pInfo->numOfPoints > TAIL_MAX_OFFSET)) {
return false;
}
pInfo->pItems = (STailItem **)((char *)pInfo + sizeof(STailInfo));
char *pItem = (char *)pInfo->pItems + pInfo->numOfPoints * POINTER_BYTES;
size_t unitSize = sizeof(STailItem) + pInfo->colBytes;
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
pInfo->pItems[i] = (STailItem *)(pItem + i * unitSize);
pInfo->pItems[i]->isNull = false;
}
return true;
}
static void tailAssignResult(STailItem* pItem, char *data, int32_t colBytes, TSKEY ts, bool isNull) {
pItem->timestamp = ts;
if (isNull) {
pItem->isNull = true;
} else {
memcpy(pItem->data, data, colBytes);
}
}
static int32_t tailCompFn(const void *p1, const void *p2, const void *param) {
STailItem *d1 = *(STailItem **)p1;
STailItem *d2 = *(STailItem **)p2;
return compareInt64Val(&d1->timestamp, &d2->timestamp);
}
static void doTailAdd(STailInfo* pInfo, char *data, TSKEY ts, bool isNull) {
STailItem **pList = pInfo->pItems;
if (pInfo->numAdded < pInfo->numOfPoints) {
tailAssignResult(pList[pInfo->numAdded], data, pInfo->colBytes, ts, isNull);
taosheapsort((void *)pList, sizeof(STailItem **), pInfo->numAdded + 1, NULL, tailCompFn, 0);
pInfo->numAdded++;
} else if (pList[0]->timestamp < ts) {
tailAssignResult(pList[0], data, pInfo->colBytes, ts, isNull);
taosheapadjust((void *)pList, sizeof(STailItem **), 0, pInfo->numOfPoints - 1, NULL, tailCompFn, NULL, 0);
}
}
int32_t tailFunction(SqlFunctionCtx* pCtx) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
SColumnInfoData* pInputCol = pInput->pData[0];
SColumnInfoData* pTsOutput = pCtx->pTsOutput;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
int32_t startOffset = pCtx->offset;
if (pInfo->offset >= pInput->numOfRows) {
return 0;
} else {
pInfo->numOfPoints = MIN(pInfo->numOfPoints, pInput->numOfRows - pInfo->offset);
}
for (int32_t i = pInput->startRowIndex; i < pInput->numOfRows + pInput->startRowIndex - pInfo->offset; i += 1) {
char* data = colDataGetData(pInputCol, i);
doTailAdd(pInfo, data, tsList[i], colDataIsNull_s(pInputCol, i));
}
taosqsort(pInfo->pItems, pInfo->numOfPoints, POINTER_BYTES, NULL, tailCompFn);
for (int32_t i = 0; i < pInfo->numOfPoints; ++i) {
int32_t pos = startOffset + i;
STailItem *pItem = pInfo->pItems[i];
if (pItem->isNull) {
colDataAppendNULL(pOutput, pos);
} else {
colDataAppend(pOutput, pos, pItem->data, false);
}
}
return pInfo->numOfPoints;
}
int32_t tailFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx);
STailInfo* pInfo = GET_ROWCELL_INTERBUF(pEntryInfo);
pEntryInfo->complete = true;
int32_t type = pCtx->input.pData[0]->info.type;
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
// todo assign the tag value and the corresponding row data
int32_t currentRow = pBlock->info.rows;
for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) {
STailItem *pItem = pInfo->pItems[i];
colDataAppend(pCol, currentRow, pItem->data, false);
//setSelectivityValue(pCtx, pBlock, &pInfo->pItems[i].tuplePos, currentRow);
currentRow += 1;
}
return pEntryInfo->numOfRes;
}
...@@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub { ...@@ -312,6 +312,7 @@ typedef struct SUdfcFuncStub {
char udfName[TSDB_FUNC_NAME_LEN]; char udfName[TSDB_FUNC_NAME_LEN];
UdfcFuncHandle handle; UdfcFuncHandle handle;
int32_t refCount; int32_t refCount;
int64_t lastRefTime;
} SUdfcFuncStub; } SUdfcFuncStub;
typedef struct SUdfcProxy { typedef struct SUdfcProxy {
...@@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { ...@@ -1446,6 +1447,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
uv_mutex_unlock(&gUdfdProxy.udfStubsMutex); uv_mutex_unlock(&gUdfdProxy.udfStubsMutex);
*pHandle = foundStub->handle; *pHandle = foundStub->handle;
++foundStub->refCount; ++foundStub->refCount;
foundStub->lastRefTime = taosGetTimestampUs();
return 0; return 0;
} }
*pHandle = NULL; *pHandle = NULL;
...@@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) { ...@@ -1455,6 +1457,7 @@ int32_t accquireUdfFuncHandle(char* udfName, UdfcFuncHandle* pHandle) {
strcpy(stub.udfName, udfName); strcpy(stub.udfName, udfName);
stub.handle = *pHandle; stub.handle = *pHandle;
++stub.refCount; ++stub.refCount;
stub.lastRefTime = taosGetTimestampUs();
taosArrayPush(gUdfdProxy.udfStubs, &stub); taosArrayPush(gUdfdProxy.udfStubs, &stub);
taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub); taosArraySort(gUdfdProxy.udfStubs, compareUdfcFuncSub);
} else { } else {
...@@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() { ...@@ -1662,7 +1665,8 @@ int32_t cleanUpUdfs() {
fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle); fnInfo("tear down udf. udf name: %s, handle: %p", stub->udfName, stub->handle);
doTeardownUdf(stub->handle); doTeardownUdf(stub->handle);
} else { } else {
fnInfo("udf still in use. udf name: %s, ref count: %d, handle: %p", stub->udfName, stub->refCount, stub->handle); fnInfo("udf still in use. udf name: %s, ref count: %d, last ref time: %"PRId64", handle: %p",
stub->udfName, stub->refCount, stub->lastRefTime, stub->handle);
taosArrayPush(udfStubs, stub); taosArrayPush(udfStubs, stub);
} }
++i; ++i;
......
...@@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { ...@@ -485,7 +485,146 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
uv_stop(global.loop); uv_stop(global.loop);
} }
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } typedef enum EUdfdRpcReqRspType {
UDFD_RPC_MNODE_CONNECT = 0,
UDFD_RPC_RETRIVE_FUNC,
} EUdfdRpcReqRspType;
typedef struct SUdfdRpcSendRecvInfo {
EUdfdRpcReqRspType rpcType;
int32_t code;
void* param;
uv_sem_t resultSem;
} SUdfdRpcSendRecvInfo;
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) {
SUdfdRpcSendRecvInfo *msgInfo = (SUdfdRpcSendRecvInfo *)pMsg->ahandle;
ASSERT(pMsg->ahandle != NULL);
if (pEpSet) {
if (!isEpsetEqual(&global.mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&global.mgmtEp, pEpSet);
}
}
if (pMsg->code != TSDB_CODE_SUCCESS) {
fnError("udfd rpc error. code: %s", tstrerror(pMsg->code));
msgInfo->code = pMsg->code;
goto _return;
}
if (msgInfo->rpcType == UDFD_RPC_MNODE_CONNECT) {
SConnectRsp connectRsp = {0};
tDeserializeSConnectRsp(pMsg->pCont, pMsg->contLen, &connectRsp);
if (connectRsp.epSet.numOfEps == 0) {
msgInfo->code = TSDB_CODE_MND_APP_ERROR;
goto _return;
}
if (connectRsp.dnodeNum > 1 && !isEpsetEqual(&global.mgmtEp.epSet, &connectRsp.epSet)) {
updateEpSet_s(&global.mgmtEp, &connectRsp.epSet);
}
msgInfo->code = 0;
} else if (msgInfo->rpcType == UDFD_RPC_RETRIVE_FUNC) {
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(pMsg->pCont, pMsg->contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
SUdf* udf = msgInfo->param;
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", pFuncInfo->name);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
msgInfo->code = 0;
}
_return:
rpcFreeCont(pMsg->pCont);
uv_sem_post(&msgInfo->resultSem);
return;
}
int32_t udfdConnectToMNode() {
SConnectReq connReq = {0};
connReq.connType = CONN_TYPE__UDFD;
tstrncpy(connReq.app, "udfd",sizeof(connReq.app));
tstrncpy(connReq.user, TSDB_DEFAULT_USER, sizeof(connReq.user));
char pass[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
tstrncpy(connReq.passwd, pass, sizeof(connReq.passwd));
connReq.pid = htonl(taosGetPId());
connReq.startTime = htobe64(taosGetTimestampMs());
int32_t contLen = tSerializeSConnectReq(NULL, 0, &connReq);
void* pReq = rpcMallocCont(contLen);
tSerializeSConnectReq(pReq, contLen, &connReq);
SUdfdRpcSendRecvInfo *msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_MNODE_CONNECT;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.msgType = TDMT_MND_CONNECT;
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(global.clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
int32_t code = msgInfo->code;
uv_sem_destroy(&msgInfo->resultSem);
taosMemoryFree(msgInfo);
return code;
}
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SUdfdRpcSendRecvInfo* msgInfo = taosMemoryCalloc(1, sizeof(SUdfdRpcSendRecvInfo));
msgInfo->rpcType = UDFD_RPC_RETRIVE_FUNC;
msgInfo->param = udf;
uv_sem_init(&msgInfo->resultSem, 0);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
rpcMsg.ahandle = msgInfo;
rpcSendRequest(clientRpc, &global.mgmtEp.epSet, &rpcMsg, NULL);
uv_sem_wait(&msgInfo->resultSem);
uv_sem_destroy(&msgInfo->resultSem);
int32_t code = msgInfo->code;
taosMemoryFree(msgInfo);
return code;
}
static bool udfdRpcRfp(int32_t code) {
if (code == TSDB_CODE_RPC_REDIRECT) {
return true;
} else {
return false;
}
}
int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) { int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSet) {
pEpSet->version = 0; pEpSet->version = 0;
...@@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe ...@@ -528,69 +667,30 @@ int initEpSetFromCfg(const char* firstEp, const char* secondEp, SCorEpSet* pEpSe
return 0; return 0;
} }
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
taosArrayPush(retrieveReq.pFuncNames, udfName);
int32_t contLen = tSerializeSRetrieveFuncReq(NULL, 0, &retrieveReq);
void *pReq = rpcMallocCont(contLen);
tSerializeSRetrieveFuncReq(pReq, contLen, &retrieveReq);
taosArrayDestroy(retrieveReq.pFuncNames);
SRpcMsg rpcMsg = {0};
rpcMsg.pCont = pReq;
rpcMsg.contLen = contLen;
rpcMsg.msgType = TDMT_MND_RETRIEVE_FUNC;
SRpcMsg rpcRsp = {0};
rpcSendRecv(clientRpc, &global.mgmtEp.epSet, &rpcMsg, &rpcRsp);
SRetrieveFuncRsp retrieveRsp = {0};
tDeserializeSRetrieveFuncRsp(rpcRsp.pCont, rpcRsp.contLen, &retrieveRsp);
SFuncInfo *pFuncInfo = (SFuncInfo *)taosArrayGet(retrieveRsp.pFuncInfos, 0);
udf->funcType = pFuncInfo->funcType;
udf->scriptType = pFuncInfo->scriptType;
udf->outputType = pFuncInfo->funcType;
udf->outputLen = pFuncInfo->outputLen;
udf->bufSize = pFuncInfo->bufSize;
char path[PATH_MAX] = {0};
snprintf(path, sizeof(path), "%s/lib%s.so", "/tmp", udfName);
TdFilePtr file = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_READ | TD_FILE_TRUNC | TD_FILE_AUTO_DEL);
// TODO check for failure of flush to disk
taosWriteFile(file, pFuncInfo->pCode, pFuncInfo->codeSize);
taosCloseFile(&file);
strncpy(udf->path, path, strlen(path));
taosArrayDestroy(retrieveRsp.pFuncInfos);
rpcFreeCont(rpcRsp.pCont);
return 0;
}
int32_t udfdOpenClientRpc() { int32_t udfdOpenClientRpc() {
char *pass = "taosdata";
char *user = "root";
char secretEncrypt[TSDB_PASSWORD_LEN + 1] = {0};
taosEncryptPass_c((uint8_t *)pass, strlen(pass), secretEncrypt);
SRpcInit rpcInit = {0}; SRpcInit rpcInit = {0};
rpcInit.label = (char *)"UDFD"; rpcInit.label = "UDFD";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = udfdProcessRpcRsp; rpcInit.cfp = (RpcCfp)udfdProcessRpcRsp;
rpcInit.sessions = 1024; rpcInit.sessions = 1024;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.idleTime = 30 * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.user = TSDB_DEFAULT_USER;
rpcInit.ckey = "key";
rpcInit.spi = 1;
rpcInit.parent = &global; rpcInit.parent = &global;
rpcInit.rfp = udfdRpcRfp;
rpcInit.user = (char *)user; char pass[TSDB_PASSWORD_LEN + 1] = {0};
rpcInit.ckey = (char *)"key"; taosEncryptPass_c((uint8_t *)(TSDB_DEFAULT_PASS), strlen(TSDB_DEFAULT_PASS), pass);
rpcInit.secret = (char *)secretEncrypt; rpcInit.secret = pass;
rpcInit.spi = 1;
global.clientRpc = rpcOpen(&rpcInit); global.clientRpc = rpcOpen(&rpcInit);
if (global.clientRpc == NULL) {
fnError("failed to init dnode rpc client");
return -1;
}
return 0; return 0;
} }
...@@ -700,12 +800,6 @@ static int32_t udfdRun() { ...@@ -700,12 +800,6 @@ static int32_t udfdRun() {
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex); uv_mutex_init(&global.udfsMutex);
// TOOD: client rpc to fetch udf function info from mnode
if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -1;
}
if (udfdUvInit() != 0) { if (udfdUvInit() != 0) {
fnError("uv init failure"); fnError("uv init failure");
return -2; return -2;
...@@ -717,7 +811,6 @@ static int32_t udfdRun() { ...@@ -717,7 +811,6 @@ static int32_t udfdRun() {
int codeClose = uv_loop_close(global.loop); int codeClose = uv_loop_close(global.loop);
fnDebug("uv loop close. result: %s", uv_err_name(codeClose)); fnDebug("uv loop close. result: %s", uv_err_name(codeClose));
removeListeningPipe(); removeListeningPipe();
udfdCloseClientRpc();
uv_mutex_destroy(&global.udfsMutex); uv_mutex_destroy(&global.udfsMutex);
taosHashCleanup(global.udfsHash); taosHashCleanup(global.udfsHash);
return 0; return 0;
...@@ -746,9 +839,22 @@ int main(int argc, char *argv[]) { ...@@ -746,9 +839,22 @@ int main(int argc, char *argv[]) {
if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) { if (taosInitCfg(configDir, NULL, NULL, NULL, NULL, 0) != 0) {
fnError("failed to start since read config error"); fnError("failed to start since read config error");
return -1; return -2;
} }
initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp); initEpSetFromCfg(tsFirst, tsSecond, &global.mgmtEp);
return udfdRun(); if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure");
return -3;
}
if (udfdConnectToMNode() != 0) {
fnError("failed to start since can not connect to mnode");
return -4;
}
udfdRun();
udfdCloseClientRpc();
} }
...@@ -33,11 +33,17 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond; ...@@ -33,11 +33,17 @@ typedef enum { MATCH, CONTINUE, BREAK } TExeCond;
typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type); typedef TExeCond (*_cache_range_compare)(void* a, void* b, int8_t type);
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b); TExeCond tCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b, int8_t dType);
TExeCond tDoCompare(__compar_fn_t func, int8_t cmpType, void* a, void* b);
_cache_range_compare indexGetCompare(RangeType ty); _cache_range_compare indexGetCompare(RangeType ty);
int32_t indexConvertData(void* src, int8_t type, void** dst); int32_t indexConvertData(void* src, int8_t type, void** dst);
int32_t indexConvertDataToStr(void* src, int8_t type, void** dst);
int32_t indexGetDataByteLen(int8_t type);
char* indexInt2str(int64_t val, char* dst, int radix);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -109,17 +109,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) { ...@@ -109,17 +109,15 @@ int indexOpen(SIndexOpts* opts, const char* path, SIndex** index) {
taosThreadMutexInit(&sIdx->mtx, NULL); taosThreadMutexInit(&sIdx->mtx, NULL);
sIdx->refId = indexAddRef(sIdx); sIdx->refId = indexAddRef(sIdx);
taosAcquireRef(indexRefMgt, sIdx->refId); indexAcquireRef(sIdx->refId);
*index = sIdx; *index = sIdx;
return 0; return 0;
END: END:
if (sIdx != NULL) { if (sIdx != NULL) {
indexClose(sIdx); indexClose(sIdx);
} }
*index = NULL; *index = NULL;
return -1; return -1;
} }
...@@ -273,7 +271,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy ...@@ -273,7 +271,7 @@ SIndexTerm* indexTermCreate(int64_t suid, SIndexOperOnColumn oper, uint8_t colTy
tm->nColName = nColName; tm->nColName = nColName;
char* buf = NULL; char* buf = NULL;
int32_t len = indexConvertData((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf); int32_t len = indexConvertDataToStr((void*)colVal, INDEX_TYPE_GET_TYPE(colType), (void**)&buf);
assert(len != -1); assert(len != -1);
tm->colVal = buf; tm->colVal = buf;
......
...@@ -282,8 +282,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe ...@@ -282,8 +282,10 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
if (0 != strncmp(c->colVal, pCt->colVal, skip)) { if (0 != strncmp(c->colVal, pCt->colVal, skip)) {
break; break;
} }
char* p = taosMemoryCalloc(1, strlen(c->colVal) + 1);
memcpy(p, c->colVal, strlen(c->colVal));
TExeCond cond = cmpFn(c->colVal + skip, term->colVal, dType); TExeCond cond = cmpFn(p + skip, term->colVal, dType);
if (cond == MATCH) { if (cond == MATCH) {
if (c->operaType == ADD_VALUE) { if (c->operaType == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid) INDEX_MERGE_ADD_DEL(tr->deled, tr->added, c->uid)
...@@ -297,6 +299,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe ...@@ -297,6 +299,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
} else if (cond == BREAK) { } else if (cond == BREAK) {
break; break;
} }
taosMemoryFree(p);
} }
taosMemoryFree(pCt); taosMemoryFree(pCt);
...@@ -463,7 +466,6 @@ int indexCacheSchedToMerge(IndexCache* pCache) { ...@@ -463,7 +466,6 @@ int indexCacheSchedToMerge(IndexCache* pCache) {
// schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t)); // schedMsg.thandle = taosMemoryCalloc(1, sizeof(int64_t));
// memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t)); // memcpy((char*)(schedMsg.thandle), (char*)&(pCache->index->refId), sizeof(int64_t));
schedMsg.msg = NULL; schedMsg.msg = NULL;
indexAcquireRef(pCache->index->refId); indexAcquireRef(pCache->index->refId);
taosScheduleTask(indexQhandle, &schedMsg); taosScheduleTask(indexQhandle, &schedMsg);
......
...@@ -19,10 +19,38 @@ ...@@ -19,10 +19,38 @@
#include "tcoding.h" #include "tcoding.h"
#include "tcompare.h" #include "tcompare.h"
#include "tdataformat.h" #include "tdataformat.h"
#include "ttypes.h"
char JSON_COLUMN[] = "JSON"; char JSON_COLUMN[] = "JSON";
char JSON_VALUE_DELIM = '&'; char JSON_VALUE_DELIM = '&';
char* indexInt2str(int64_t val, char* dst, int radix) {
char buffer[65];
char* p;
int64_t new_val;
uint64_t uval = (uint64_t)val;
if (radix < 0) {
if (val < 0) {
*dst++ = '-';
uval = (uint64_t)0 - uval; /* Avoid integer overflow in (-val) for LLONG_MIN (BUG#31799). */
}
}
p = &buffer[sizeof(buffer) - 1];
*p = '\0';
new_val = (int64_t)(uval / 10);
*--p = '0' + (char)(uval - (uint64_t)new_val * 10);
val = new_val;
while (val != 0) {
new_val = val / 10;
*--p = '0' + (char)(val - new_val * 10);
val = new_val;
}
while ((*dst++ = *p++) != 0)
;
return dst - 1;
}
static __compar_fn_t indexGetCompar(int8_t type) { static __compar_fn_t indexGetCompar(int8_t type) {
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
return (__compar_fn_t)strcmp; return (__compar_fn_t)strcmp;
...@@ -31,25 +59,49 @@ static __compar_fn_t indexGetCompar(int8_t type) { ...@@ -31,25 +59,49 @@ static __compar_fn_t indexGetCompar(int8_t type) {
} }
static TExeCond tCompareLessThan(void* a, void* b, int8_t type) { static TExeCond tCompareLessThan(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type); __compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_LESS_THAN, a, b); return tCompare(func, QUERY_LESS_THAN, a, b, type);
} }
static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) { static TExeCond tCompareLessEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type); __compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_LESS_EQUAL, a, b); return tCompare(func, QUERY_LESS_EQUAL, a, b, type);
} }
static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) { static TExeCond tCompareGreaterThan(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type); __compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_GREATER_THAN, a, b); return tCompare(func, QUERY_GREATER_THAN, a, b, type);
} }
static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) { static TExeCond tCompareGreaterEqual(void* a, void* b, int8_t type) {
__compar_fn_t func = indexGetCompar(type); __compar_fn_t func = indexGetCompar(type);
return tDoCommpare(func, QUERY_GREATER_EQUAL, a, b); return tCompare(func, QUERY_GREATER_EQUAL, a, b, type);
} }
TExeCond tCompare(__compar_fn_t func, int8_t cmptype, void* a, void* b, int8_t dtype) {
TExeCond tDoCommpare(__compar_fn_t func, int8_t comType, void* a, void* b) { if (dtype == TSDB_DATA_TYPE_BINARY || dtype == TSDB_DATA_TYPE_NCHAR) {
return tDoCompare(func, cmptype, a, b);
}
#if 1
int8_t bytes = tDataTypes[dtype].bytes;
if (bytes == 1) {
int8_t va = taosStr2int64(a);
int8_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
} else if (bytes == 2) {
int16_t va = taosStr2int64(a);
int16_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
} else if (bytes == 4) {
int32_t va = taosStr2int64(a);
int32_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
} else {
int64_t va = taosStr2int64(a);
int64_t vb = taosStr2int64(b);
return tDoCompare(func, cmptype, &va, &vb);
}
#endif
}
TExeCond tDoCompare(__compar_fn_t func, int8_t comparType, void* a, void* b) {
// optime later // optime later
int32_t ret = func(a, b); int32_t ret = func(a, b);
switch (comType) { switch (comparType) {
case QUERY_LESS_THAN: { case QUERY_LESS_THAN: {
if (ret < 0) return MATCH; if (ret < 0) return MATCH;
} break; } break;
...@@ -174,9 +226,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { ...@@ -174,9 +226,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) {
tlen = taosEncodeFixedU32(dst, *(uint32_t*)src); tlen = taosEncodeFixedU32(dst, *(uint32_t*)src);
break; break;
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT:
tlen = taosEncodeFixedI64(NULL, *(uint32_t*)src); tlen = taosEncodeFixedI64(NULL, *(int64_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1); *dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedI64(dst, *(uint32_t*)src); tlen = taosEncodeFixedI64(dst, *(int64_t*)src);
break; break;
case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_DOUBLE:
tlen = taosEncodeBinary(NULL, src, sizeof(double)); tlen = taosEncodeBinary(NULL, src, sizeof(double));
...@@ -184,9 +236,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { ...@@ -184,9 +236,9 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) {
tlen = taosEncodeBinary(dst, src, sizeof(double)); tlen = taosEncodeBinary(dst, src, sizeof(double));
break; break;
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT:
tlen = taosEncodeFixedU64(NULL, *(uint32_t*)src); tlen = taosEncodeFixedU64(NULL, *(uint64_t*)src);
*dst = taosMemoryCalloc(1, tlen + 1); *dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeFixedU64(dst, *(uint32_t*)src); tlen = taosEncodeFixedU64(dst, *(uint64_t*)src);
break; break;
case TSDB_DATA_TYPE_NCHAR: { case TSDB_DATA_TYPE_NCHAR: {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src)); tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
...@@ -215,14 +267,94 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) { ...@@ -215,14 +267,94 @@ int32_t indexConvertData(void* src, int8_t type, void** dst) {
break; break;
} }
*dst = *dst - tlen; *dst = *dst - tlen;
if (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR && type != TSDB_DATA_TYPE_VARBINARY && // indexMayFillNumbericData(*dst, tlen);
type == TSDB_DATA_TYPE_VARCHAR) { return tlen;
uint8_t* p = *dst; }
for (int i = 0; i < tlen; i++) { int32_t indexConvertDataToStr(void* src, int8_t type, void** dst) {
if (p[i] == 0) { int tlen = tDataTypes[type].bytes;
p[i] = (uint8_t)'0';
} switch (type) {
case TSDB_DATA_TYPE_TIMESTAMP:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int64_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
// tlen = taosEncodeFixedU8(NULL, *(uint8_t*)src);
//*dst = taosMemoryCalloc(1, tlen + 1);
// tlen = taosEncodeFixedU8(dst, *(uint8_t*)src);
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint8_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_TINYINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int8_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_SMALLINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int16_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_USMALLINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint16_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_INT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int32_t*)src, *dst, -1);
break;
case TSDB_DATA_TYPE_FLOAT:
tlen = taosEncodeBinary(NULL, src, sizeof(float));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, sizeof(float));
*dst = *dst - tlen;
break;
case TSDB_DATA_TYPE_UINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint32_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_BIGINT:
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(int64_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_DOUBLE:
tlen = taosEncodeBinary(NULL, src, sizeof(double));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, sizeof(double));
*dst = *dst - tlen;
break;
case TSDB_DATA_TYPE_UBIGINT:
assert(0);
*dst = taosMemoryCalloc(1, sizeof(int64_t) + 1);
indexInt2str(*(uint64_t*)src, *dst, 1);
break;
case TSDB_DATA_TYPE_NCHAR: {
tlen = taosEncodeBinary(NULL, varDataVal(src), varDataLen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, varDataVal(src), varDataLen(src));
*dst = *dst - tlen;
break;
} }
case TSDB_DATA_TYPE_VARCHAR: { // TSDB_DATA_TYPE_BINARY
#if 1
tlen = taosEncodeBinary(NULL, src, strlen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, strlen(src));
*dst = *dst - tlen;
break;
#endif
}
case TSDB_DATA_TYPE_VARBINARY:
#if 1
tlen = taosEncodeBinary(NULL, src, strlen(src));
*dst = taosMemoryCalloc(1, tlen + 1);
tlen = taosEncodeBinary(dst, src, strlen(src));
*dst = *dst - tlen;
break;
#endif
default:
TASSERT(0);
break;
} }
return tlen; return tlen;
} }
...@@ -1324,7 +1324,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb ...@@ -1324,7 +1324,7 @@ StreamWithStateResult* streamWithStateNextWith(StreamWithState* sws, StreamCallb
if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) { if (FST_NODE_ADDR(p->node) != fstGetRootAddr(sws->fst)) {
taosArrayPop(sws->inp); taosArrayPop(sws->inp);
} }
streamStateDestroy(p); // streamStateDestroy(p);
continue; continue;
} }
FstTransition trn; FstTransition trn;
......
...@@ -410,8 +410,9 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult* ...@@ -410,8 +410,9 @@ static int32_t tfSearchTerm_JSON(void* reader, SIndexTerm* tem, SIdxTempResult*
ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total); ret = tfileReaderLoadTableIds((TFileReader*)reader, offset, tr->total);
cost = taosGetTimestampUs() - et; cost = taosGetTimestampUs() - et;
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, time cost: %" PRIu64 "us", tem->suid, indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, load all table info, offset: %" PRIu64
tem->colName, tem->colVal, cost); ", size: %d, time cost: %" PRIu64 "us",
tem->suid, tem->colName, tem->colVal, offset, (int)taosArrayGetSize(tr->total), cost);
} }
fstSliceDestroy(&key); fstSliceDestroy(&key);
return 0; return 0;
...@@ -941,7 +942,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray* ...@@ -941,7 +942,7 @@ static int tfileReaderLoadTableIds(TFileReader* reader, int32_t offset, SArray*
// TODO(yihao): opt later // TODO(yihao): opt later
WriterCtx* ctx = reader->ctx; WriterCtx* ctx = reader->ctx;
// add block cache // add block cache
char block[1024] = {0}; char block[4096] = {0};
int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset); int32_t nread = ctx->readFrom(ctx, block, sizeof(block), offset);
assert(nread >= sizeof(uint32_t)); assert(nread >= sizeof(uint32_t));
......
...@@ -56,6 +56,29 @@ class JsonEnv : public ::testing::Test { ...@@ -56,6 +56,29 @@ class JsonEnv : public ::testing::Test {
SIndexJson* index; SIndexJson* index;
}; };
static void WriteData(SIndexJson* index, const std::string& colName, int8_t dtype, void* data, int dlen, int tableId,
int8_t operType = ADD_VALUE) {
SIndexTerm* term =
indexTermCreate(1, (SIndexOperOnColumn)operType, dtype, colName.c_str(), colName.size(), (const char*)data, dlen);
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
tIndexJsonPut(index, terms, (int64_t)tableId);
indexMultiTermDestroy(terms);
}
static void Search(SIndexJson* index, const std::string& colNam, int8_t dtype, void* data, int dlen, int8_t filterType,
SArray** result) {
std::string colName(colNam);
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, dtype, colName.c_str(), colName.size(), (const char*)data, dlen);
SArray* res = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, (EIndexQueryType)filterType);
tIndexJsonSearch(index, mq, res);
indexMultiTermQueryDestroy(mq);
*result = res;
}
TEST_F(JsonEnv, testWrite) { TEST_F(JsonEnv, testWrite) {
{ {
std::string colName("test"); std::string colName("test");
...@@ -204,9 +227,10 @@ TEST_F(JsonEnv, testWriteMillonData) { ...@@ -204,9 +227,10 @@ TEST_F(JsonEnv, testWriteMillonData) {
TEST_F(JsonEnv, testWriteJsonNumberData) { TEST_F(JsonEnv, testWriteJsonNumberData) {
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("10"); // std::string colVal("10");
int val = 10;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
...@@ -217,9 +241,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -217,9 +241,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
} }
{ {
std::string colName("test2"); std::string colName("test2");
std::string colVal("20"); int val = 20;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
...@@ -229,10 +253,10 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -229,10 +253,10 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
{ {
std::string colName("test2"); std::string colName("test");
std::string colVal("15"); int val = 15;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
...@@ -243,9 +267,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -243,9 +267,9 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
} }
{ {
std::string colName("test2"); std::string colName("test2");
std::string colVal("15"); const char* val = "test";
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); (const char*)val, strlen(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
...@@ -255,12 +279,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -255,12 +279,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
indexMultiTermDestroy(terms); indexMultiTermDestroy(terms);
} }
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("10"); int val = 15;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM); indexMultiTermQueryAdd(mq, q, QUERY_TERM);
...@@ -270,11 +293,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -270,11 +293,11 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
} }
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("10"); int val = 15;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
...@@ -284,11 +307,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -284,11 +307,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
} }
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("10"); int val = 10;
;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(int));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
...@@ -298,11 +322,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -298,11 +322,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
} }
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("10"); int val = 10;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN); indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
...@@ -312,11 +337,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -312,11 +337,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
} }
{ {
std::string colName("test"); std::string colName("test");
std::string colVal("10"); int val = 10;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL); indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
...@@ -326,12 +352,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) { ...@@ -326,12 +352,12 @@ TEST_F(JsonEnv, testWriteJsonNumberData) {
} }
} }
TEST_F(JsonEnv, testWriteJsonTfileAndCache) { TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT) {
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); int val = 10;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
...@@ -355,11 +381,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -355,11 +381,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
} }
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_TERM); indexMultiTermQueryAdd(mq, q, QUERY_TERM);
...@@ -369,11 +395,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -369,11 +395,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
} }
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(int));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
...@@ -383,11 +409,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -383,11 +409,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
} }
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); // std::string colVal("10");
int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
...@@ -397,11 +424,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -397,11 +424,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
} }
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN); indexMultiTermQueryAdd(mq, q, QUERY_GREATER_THAN);
...@@ -411,11 +438,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -411,11 +438,11 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
} }
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); int val = 10;
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL); indexMultiTermQueryAdd(mq, q, QUERY_LESS_EQUAL);
...@@ -425,9 +452,10 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -425,9 +452,10 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
} }
{ {
std::string colName("other_column"); std::string colName("other_column");
std::string colVal("100"); int val = 100;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size()); (const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate(); SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term); indexMultiTermAdd(terms, term);
...@@ -438,11 +466,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -438,11 +466,12 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
} }
{ {
std::string colName("test1"); std::string colName("test1");
std::string colVal("10"); int val = 10;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST); SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(), colVal.c_str(), SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
colVal.size()); (const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t)); SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN); indexMultiTermQueryAdd(mq, q, QUERY_LESS_THAN);
...@@ -450,4 +479,102 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) { ...@@ -450,4 +479,102 @@ TEST_F(JsonEnv, testWriteJsonTfileAndCache) {
EXPECT_EQ(0, taosArrayGetSize(result)); EXPECT_EQ(0, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq); indexMultiTermQueryDestroy(mq);
} }
{
std::string colName("test1");
int val = 15;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i + 1000);
}
indexMultiTermDestroy(terms);
}
{
std::string colName("test1");
int val = 8;
// std::string colVal("10");
SIndexMultiTermQuery* mq = indexMultiTermQueryCreate(MUST);
SIndexTerm* q = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_INT, colName.c_str(), colName.size(),
(const char*)&val, sizeof(val));
SArray* result = taosArrayInit(1, sizeof(uint64_t));
indexMultiTermQueryAdd(mq, q, QUERY_GREATER_EQUAL);
tIndexJsonSearch(index, mq, result);
EXPECT_EQ(2000, taosArrayGetSize(result));
indexMultiTermQueryDestroy(mq);
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache_INT2) {
{
int val = 10;
std::string colName("test1");
for (int i = 0; i < 10000; i++) {
val += 1;
WriteData(index, colName, TSDB_DATA_TYPE_INT, &val, sizeof(val), i);
}
}
{
int val = 10;
std::string colName("test2xxx");
std::string colVal("xxxxxxxxxxxxxxx");
for (int i = 0; i < 100000; i++) {
val += 1;
WriteData(index, colName, TSDB_DATA_TYPE_BINARY, (void*)(colVal.c_str()), colVal.size(), i);
}
}
{
SArray* res = NULL;
std::string colName("test1");
int val = 9;
Search(index, colName, TSDB_DATA_TYPE_INT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(10000, taosArrayGetSize(res));
}
{
SArray* res = NULL;
std::string colName("test2xxx");
std::string colVal("xxxxxxxxxxxxxxx");
Search(index, colName, TSDB_DATA_TYPE_BINARY, (void*)(colVal.c_str()), colVal.size(), QUERY_TERM, &res);
EXPECT_EQ(100000, taosArrayGetSize(res));
}
}
TEST_F(JsonEnv, testWriteJsonTfileAndCache_FLOAT) {
{
float val = 10.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i);
}
}
{
float val = 2.0;
std::string colName("test1");
for (int i = 0; i < 1000; i++) {
WriteData(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), i);
}
}
{
SArray* res = NULL;
std::string colName("test1");
float val = 1.9;
Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(2000, taosArrayGetSize(res));
}
{
SArray* res = NULL;
std::string colName("test1");
float val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
{
std::string colName("test1");
SArray* res = NULL;
float val = 2.1;
Search(index, colName, TSDB_DATA_TYPE_FLOAT, &val, sizeof(val), QUERY_GREATER_EQUAL, &res);
EXPECT_EQ(1000, taosArrayGetSize(res));
}
} }
...@@ -6,12 +6,14 @@ ...@@ -6,12 +6,14 @@
#include <vector> #include <vector>
#include "index.h" #include "index.h"
#include "indexCache.h" #include "indexCache.h"
#include "indexComm.h"
#include "indexFst.h" #include "indexFst.h"
#include "indexFstCountingWriter.h" #include "indexFstCountingWriter.h"
#include "indexFstUtil.h" #include "indexFstUtil.h"
#include "indexInt.h" #include "indexInt.h"
#include "indexTfile.h" #include "indexTfile.h"
#include "indexUtil.h" #include "indexUtil.h"
#include "tcoding.h"
#include "tglobal.h" #include "tglobal.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tutil.h" #include "tutil.h"
...@@ -305,3 +307,17 @@ TEST_F(UtilEnv, 01Except) { ...@@ -305,3 +307,17 @@ TEST_F(UtilEnv, 01Except) {
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1); ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 0), 1);
ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100); ASSERT_EQ(*(uint64_t *)taosArrayGet(total, 1), 100);
} }
TEST_F(UtilEnv, testFill) {
for (int i = 0; i < 10000000; i++) {
int64_t val = i;
char buf[65] = {0};
indexInt2str(val, buf, 1);
EXPECT_EQ(val, taosStr2int64(buf));
}
for (int i = 0; i < 10000000; i++) {
int64_t val = 0 - i;
char buf[65] = {0};
indexInt2str(val, buf, -1);
EXPECT_EQ(val, taosStr2int64(buf));
}
}
...@@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1 ...@@ -7,7 +7,7 @@ system sh/cfg.sh -n dnode1 -c udf -v 1
print ========= start dnode1 as LEADER print ========= start dnode1 as LEADER
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sleep 2000 sleep 1000
sql connect sql connect
print ======== step1 udf print ======== step1 udf
......
...@@ -220,7 +220,6 @@ if $data[0][4] == LEADER then ...@@ -220,7 +220,6 @@ if $data[0][4] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][3] print ---- vgroup $data[0][0] leader switch to dnode $data[0][3]
elif $data[0][6] == LEADER then elif $data[0][6] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][5] print ---- vgroup $data[0][0] leader switch to dnode $data[0][5]
endi
elif $data[0][8] == LEADER then elif $data[0][8] == LEADER then
print ---- vgroup $data[0][0] leader switch to dnode $data[0][7] print ---- vgroup $data[0][0] leader switch to dnode $data[0][7]
else else
...@@ -342,7 +341,6 @@ elif $data[0][6] == LEADER then ...@@ -342,7 +341,6 @@ elif $data[0][6] == LEADER then
goto check_vg_ready_3 goto check_vg_ready_3
endi endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][7] print ---- vgroup $data[0][0] leader locating dnode $data[0][7]
endi
elif $data[0][8] == LEADER then elif $data[0][8] == LEADER then
if $data[0][4] == LEADER then if $data[0][4] == LEADER then
goto check_vg_ready_3 goto check_vg_ready_3
......
...@@ -420,7 +420,6 @@ elif $data[0][6] == LEADER then ...@@ -420,7 +420,6 @@ elif $data[0][6] == LEADER then
goto check_vg_ready_3 goto check_vg_ready_3
endi endi
print ---- vgroup $data[0][0] leader locating dnode $data[0][7] print ---- vgroup $data[0][0] leader locating dnode $data[0][7]
endi
elif $data[0][8] == LEADER then elif $data[0][8] == LEADER then
if $data[0][4] == LEADER then if $data[0][4] == LEADER then
goto check_vg_ready_3 goto check_vg_ready_3
......
...@@ -617,7 +617,7 @@ class TDTestCase: ...@@ -617,7 +617,7 @@ class TDTestCase:
tdLog.info("step 1: create database, stb, ctb and insert data") tdLog.info("step 1: create database, stb, ctb and insert data")
# create and start thread # create and start thread
parameterDict = {'cfg': '', \ parameterDict = {'cfg': '', \
'dbName': 'db60', \ 'dbName': 'db70', \
'vgroups': 4, \ 'vgroups': 4, \
'stbName': 'stb', \ 'stbName': 'stb', \
'ctbNum': 10, \ 'ctbNum': 10, \
...@@ -634,7 +634,7 @@ class TDTestCase: ...@@ -634,7 +634,7 @@ class TDTestCase:
prepareEnvThread.start() prepareEnvThread.start()
parameterDict2 = {'cfg': '', \ parameterDict2 = {'cfg': '', \
'dbName': 'db61', \ 'dbName': 'db71', \
'vgroups': 4, \ 'vgroups': 4, \
'stbName': 'stb2', \ 'stbName': 'stb2', \
'ctbNum': 10, \ 'ctbNum': 10, \
...@@ -708,9 +708,9 @@ class TDTestCase: ...@@ -708,9 +708,9 @@ class TDTestCase:
cfgPath = buildPath + "/../sim/psim/cfg" cfgPath = buildPath + "/../sim/psim/cfg"
tdLog.info("cfgPath: %s" % cfgPath) tdLog.info("cfgPath: %s" % cfgPath)
self.tmqCase1(cfgPath, buildPath) #self.tmqCase1(cfgPath, buildPath)
self.tmqCase2(cfgPath, buildPath) #self.tmqCase2(cfgPath, buildPath)
self.tmqCase3(cfgPath, buildPath) #self.tmqCase3(cfgPath, buildPath)
self.tmqCase4(cfgPath, buildPath) self.tmqCase4(cfgPath, buildPath)
self.tmqCase5(cfgPath, buildPath) self.tmqCase5(cfgPath, buildPath)
self.tmqCase6(cfgPath, buildPath) self.tmqCase6(cfgPath, buildPath)
......
...@@ -57,3 +57,5 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py ...@@ -57,3 +57,5 @@ python3 ./test.py -f 2-query/query_cols_tags_and_or.py
python3 ./test.py -f 2-query/nestedQuery.py python3 ./test.py -f 2-query/nestedQuery.py
python3 ./test.py -f 7-tmq/basic5.py python3 ./test.py -f 7-tmq/basic5.py
python3 ./test.py -f 7-tmq/subscribeDb.py
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册