提交 b1d03c01 编写于 作者: C cpwu

Merge branch '3.0' into cpwu/3.0

...@@ -103,6 +103,7 @@ int32_t create_topic() { ...@@ -103,6 +103,7 @@ int32_t create_topic() {
/*const char* sql = "select * from tu1";*/ /*const char* sql = "select * from tu1";*/
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/
/*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
......
...@@ -216,7 +216,6 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; ...@@ -216,7 +216,6 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t;
typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_conf_t tmq_conf_t;
typedef struct tmq_list_t tmq_list_t; typedef struct tmq_list_t tmq_list_t;
// typedef struct tmq_message_t tmq_message_t;
typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *));
...@@ -262,12 +261,6 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const ...@@ -262,12 +261,6 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const
DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf);
DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb);
#if 0
// temporary used function for demo only
void tmqShowMsg(tmq_message_t *tmq_message);
int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
#endif
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res);
...@@ -278,12 +271,8 @@ DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res); ...@@ -278,12 +271,8 @@ DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res);
#endif #endif
#if 0 #if 0
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message);
DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message);
DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic);
DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic);
DLL_EXPORT void tmq_message_destroy(TAOS_RES *res);
#endif #endif
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
#if 0 #if 0
......
...@@ -213,7 +213,6 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const ...@@ -213,7 +213,6 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const
*/ */
int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp);
int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num);
......
...@@ -24,16 +24,6 @@ ...@@ -24,16 +24,6 @@
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
#if 0
struct tmq_message_t {
SMqPollRsp msg;
char* topic;
SArray* res; // SArray<SReqResultInfo>
int32_t vgId;
int32_t resIter;
};
#endif
typedef struct { typedef struct {
int8_t tmqRspType; int8_t tmqRspType;
int32_t epoch; int32_t epoch;
...@@ -770,105 +760,12 @@ _return: ...@@ -770,105 +760,12 @@ _return:
} }
#endif #endif
static char* formatTimestamp(char* buf, int64_t val, int precision) {
time_t tt;
int32_t ms = 0;
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
ms = val % 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
ms = val % 1000000;
} else {
tt = (time_t)(val / 1000);
ms = val % 1000;
}
/* comment out as it make testcases like select_with_tags.sim fail.
but in windows, this may cause the call to localtime crash if tt < 0,
need to find a better solution.
if (tt < 0) {
tt = 0;
}
*/
#ifdef WINDOWS
if (tt < 0) tt = 0;
#endif
if (tt <= 0 && ms < 0) {
tt--;
if (precision == TSDB_TIME_PRECISION_NANO) {
ms += 1000000000;
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
ms += 1000000;
} else {
ms += 1000;
}
}
struct tm* ptm = taosLocalTime(&tt, NULL);
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", ms);
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", ms);
} else {
sprintf(buf + pos, ".%03d", ms);
}
return buf;
}
#if 0 #if 0
int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return 0; if (tmq_message == NULL) return 0;
SMqPollRsp* pRsp = &tmq_message->msg; SMqPollRsp* pRsp = &tmq_message->msg;
return pRsp->skipLogNum; return pRsp->skipLogNum;
} }
void tmqShowMsg(tmq_message_t* tmq_message) {
if (tmq_message == NULL) return;
static bool noPrintSchema;
char pBuf[128];
SMqPollRsp* pRsp = &tmq_message->msg;
int32_t colNum = 2;
if (!noPrintSchema) {
printf("|");
for (int32_t i = 0; i < colNum; i++) {
if (i == 0)
printf(" %25s |", pRsp->schema->pSchema[i].name);
else
printf(" %15s |", pRsp->schema->pSchema[i].name);
}
printf("\n");
printf("===============================================\n");
noPrintSchema = true;
}
int32_t sz = taosArrayGetSize(pRsp->pBlockData);
for (int32_t i = 0; i < sz; i++) {
SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i);
int32_t rows = pDataBlock->info.rows;
for (int32_t j = 0; j < rows; j++) {
printf("|");
for (int32_t k = 0; k < colNum; k++) {
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes);
switch (pColInfoData->info.type) {
case TSDB_DATA_TYPE_TIMESTAMP:
formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI);
printf(" %25s |", pBuf);
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
printf(" %15u |", *(uint32_t*)var);
break;
}
}
printf("\n");
}
}
}
#endif #endif
int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
...@@ -1049,7 +946,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -1049,7 +946,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
} }
tDeleteSMqCMGetSubEpRsp(&rsp); tDeleteSMqCMGetSubEpRsp(&rsp);
} else { } else {
/*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper));
if (pWrapper == NULL) { if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
...@@ -1208,7 +1104,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { ...@@ -1208,7 +1104,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
pRspObj->resIter = -1; pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp)); memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp));
/*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/
pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols);
...@@ -1355,31 +1250,6 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { ...@@ -1355,31 +1250,6 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) {
} }
} }
#if 0
tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* rspMsg = NULL;
int64_t startTime = taosGetTimestampMs();
int64_t status = atomic_load_64(&tmq->status);
tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT);
while (1) {
rspMsg = tmqSyncPollImpl(tmq, blocking_time);
if (rspMsg && rspMsg->consumeRsp.numOfTopics) {
return rspMsg;
}
if (blocking_time != 0) {
int64_t endTime = taosGetTimestampMs();
if (endTime - startTime > blocking_time) {
return NULL;
}
} else
return NULL;
}
}
#endif
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
SMqRspObj* rspObj; SMqRspObj* rspObj;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
...@@ -1417,137 +1287,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { ...@@ -1417,137 +1287,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
} }
} }
#if 0 tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
// TODO
if (blocking_time <= 0) blocking_time = 1; return TMQ_RESP_ERR__SUCCESS;
if (blocking_time > 1000) blocking_time = 1000;
/*blocking_time = 1;*/
if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
/*printf("over1\n");*/
taosMsleep(blocking_time);
return NULL;
}
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
if (taosArrayGetSize(pTopic->vgs) == 0) {
/*printf("over2\n");*/
taosMsleep(blocking_time);
return NULL;
}
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
int32_t beginVgIdx = pTopic->nextVgIdx;
while (1) {
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg);
if (pReq == NULL) {
ASSERT(false);
taosMsleep(blocking_time);
return NULL;
}
SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (param == NULL) {
ASSERT(false);
taosMsleep(blocking_time);
return NULL;
}
param->tmq = tmq;
param->retMsg = &tmq_message;
param->pVg = pVg;
tsem_init(&param->rspSem, 0, 0);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){
.pData = pReq,
.len = sizeof(SMqConsumeReq),
.handle = NULL,
};
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = param;
sendInfo->fp = tmqPollCb;
/*printf("req offset: %ld\n", pReq->offset);*/
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
tmq->pollCnt++;
tsem_wait(&param->rspSem);
tsem_destroy(&param->rspSem);
taosMemoryFree(param);
if (tmq_message == NULL) {
if (beginVgIdx == pTopic->nextVgIdx) {
taosMsleep(blocking_time);
} else {
continue;
}
}
return tmq_message;
}
/*tsem_wait(&pRequest->body.rspSem);*/
/*if (body != NULL) {*/
/*destroySendMsgInfo(body);*/
/*}*/
/*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/
/*pRequest->code = terrno;*/
/*}*/
/*return pRequest;*/
}
#endif
#if 0
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
if (tmq_topic_vgroup_list != NULL) {
// TODO
}
// TODO: change semaphore to gate
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg);
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam));
if (pParam == NULL) {
continue;
}
pParam->tmq = tmq;
pParam->pVg = pVg;
pParam->async = async;
if (!async) tsem_init(&pParam->rspSem, 0, 0);
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqCommitCb;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
if (!async) tsem_wait(&pParam->rspSem);
}
}
return 0;
} }
#endif
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
const char* tmq_err2str(tmq_resp_err_t err) { const char* tmq_err2str(tmq_resp_err_t err) {
if (err == TMQ_RESP_ERR__SUCCESS) { if (err == TMQ_RESP_ERR__SUCCESS) {
...@@ -1573,10 +1316,3 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { ...@@ -1573,10 +1316,3 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) {
return -1; return -1;
} }
} }
void tmq_message_destroy(TAOS_RES* res) {
if (res == NULL) return;
if (TD_RES_TMQ(res)) {
SMqRspObj* pRspObj = (SMqRspObj*)res;
}
}
...@@ -415,7 +415,7 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec ...@@ -415,7 +415,7 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec
if (type == TSDB_DATA_TYPE_BINARY) { if (type == TSDB_DATA_TYPE_BINARY) {
newColData = taosMemoryCalloc(1, charLen + 1); newColData = taosMemoryCalloc(1, charLen + 1);
memcpy(newColData, varDataVal(inputData), charLen); memcpy(newColData, varDataVal(inputData), charLen);
bool ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, 0); bool ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
taosMemoryFree(newColData); taosMemoryFree(newColData);
return ret; return ret;
...@@ -429,7 +429,7 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec ...@@ -429,7 +429,7 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
newColData[len] = 0; newColData[len] = 0;
bool ret = taosParseTime(newColData, timeVal, len + 1, (int32_t)timePrec, 0); bool ret = taosParseTime(newColData, timeVal, len + 1, (int32_t)timePrec, tsDaylight);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
taosMemoryFree(newColData); taosMemoryFree(newColData);
return ret; return ret;
......
...@@ -307,11 +307,11 @@ int32_t dmStartUdfd(SDnode *pDnode) { ...@@ -307,11 +307,11 @@ int32_t dmStartUdfd(SDnode *pDnode) {
dInfo("dnode-mgmt start udfd already called"); dInfo("dnode-mgmt start udfd already called");
return 0; return 0;
} }
pData->startCalled = true;
uv_barrier_init(&pData->barrier, 2); uv_barrier_init(&pData->barrier, 2);
pData->stopping = 0; pData->stopping = 0;
uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_thread_create(&pData->thread, dmWatchUdfd, pDnode);
uv_barrier_wait(&pData->barrier); uv_barrier_wait(&pData->barrier);
pData->startCalled = true;
pData->needCleanUp = true; pData->needCleanUp = true;
return pData->spawnErr; return pData->spawnErr;
} }
......
...@@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = { ...@@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = {
static const SPerfsTableSchema topicSchema[] = { static const SPerfsTableSchema topicSchema[] = {
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, /*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, /*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/
}; };
static const SPerfsTableSchema consumerSchema[] = { static const SPerfsTableSchema consumerSchema[] = {
......
...@@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) {
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream); // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream); /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
...@@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 ...@@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return code; return code;
} }
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) { int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark,
STrans *pTrans) {
SNode *pAst = NULL; SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) { if (nodesStringToNode(ast, &pAst) < 0) {
......
...@@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj ...@@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq);
static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp);
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
int32_t mndInitTopic(SMnode *pMnode) { int32_t mndInitTopic(SMnode *pMnode) {
...@@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) { ...@@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq);
mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp);
// mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
...@@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo ...@@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo
return 0; return 0;
} }
static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->pNode; SMnode *pMnode = pReq->pNode;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SMqTopicObj *pTopic = NULL; SMqTopicObj *pTopic = NULL;
int32_t cols = 0;
char *pWrite;
char prefix[TSDB_DB_FNAME_LEN] = {0};
SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); while (numOfRows < rowsCapacity) {
if (pDb == NULL) return 0;
tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN);
strcat(prefix, TS_PATH_DELIMITER);
int32_t prefixLen = (int32_t)strlen(prefix);
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic);
if (pShow->pIter == NULL) break; if (pShow->pIter == NULL) break;
if (pTopic->dbUid != pDb->uid) { int32_t cols = 0;
if (strncmp(pTopic->name, prefix, prefixLen) != 0) {
mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid);
}
sdbRelease(pSdb, pTopic); char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0};
continue; tstrncpy(&topicName[VARSTR_HEADER_SIZE], pTopic->name, TSDB_TOPIC_NAME_LEN);
} varDataSetLen(topicName, strlen(&topicName[VARSTR_HEADER_SIZE]));
cols = 0; SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)topicName, false);
char topicName[TSDB_TOPIC_NAME_LEN] = {0}; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN); colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
STR_TO_VARSTR(pWrite, topicName);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
*(int64_t *)pWrite = pTopic->createTime; char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE);
cols++; strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql);
varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE]));
colDataAppend(pColInfo, numOfRows, (const char *)sql, false);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; taosMemoryFree(sql);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]);
cols++;
numOfRows++; numOfRows++;
sdbRelease(pSdb, pTopic); sdbRelease(pSdb, pTopic);
} }
mndReleaseDb(pMnode, pDb);
pShow->numOfRows += numOfRows; pShow->numOfRows += numOfRows;
return numOfRows; return numOfRows;
} }
......
...@@ -109,7 +109,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList ...@@ -109,7 +109,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle); bool tqNextDataBlock(STqReadHandle *pHandle);
int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows); int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows,
int16_t *pNumOfCols);
// need to reposition // need to reposition
......
...@@ -88,22 +88,12 @@ struct STqReadHandle { ...@@ -88,22 +88,12 @@ struct STqReadHandle {
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SMeta* pVnodeMeta; SMeta* pVnodeMeta;
SArray* pColIdList; // SArray<int32_t> SArray* pColIdList; // SArray<int16_t>
int32_t sver; int32_t sver;
SSchemaWrapper* pSchemaWrapper; SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema; STSchema* pSchema;
}; };
typedef struct {
int8_t type;
int8_t reserved[7];
union {
void* data;
int64_t wmTs;
int64_t checkpointId;
};
} STqStreamToken;
typedef struct { typedef struct {
int16_t ver; int16_t ver;
int16_t action; int16_t action;
...@@ -155,24 +145,26 @@ typedef struct { ...@@ -155,24 +145,26 @@ typedef struct {
char subKey[TSDB_SUBSCRIBE_KEY_LEN]; char subKey[TSDB_SUBSCRIBE_KEY_LEN];
int64_t consumerId; int64_t consumerId;
int32_t epoch; int32_t epoch;
int8_t subType;
int8_t withTbName;
int8_t withSchema;
int8_t withTag;
int8_t withTagSchema;
char* qmsg; char* qmsg;
// SRWLatch lock; // SRWLatch lock;
SWalReadHandle* pReadHandle; SWalReadHandle* pWalReader;
// number should be identical to fetch thread num // number should be identical to fetch thread num
qTaskInfo_t task[4]; STqReadHandle* pStreamReader[4];
qTaskInfo_t task[4];
} STqExec; } STqExec;
struct STQ { struct STQ {
// the collection of groups char* path;
// the handle of meta kvstore // STqMetaStore* tqMeta;
bool writeTrigger; SHashObj* execs; // subKey -> tqExec
char* path; SHashObj* pStreamTasks;
STqMetaStore* tqMeta; SVnode* pVnode;
SHashObj* tqMetaNew; // subKey -> tqExec SWal* pWal;
SHashObj* pStreamTasks;
SVnode* pVnode;
SWal* pWal;
SMeta* pVnodeMeta;
}; };
typedef struct { typedef struct {
...@@ -252,7 +244,7 @@ int tqInit(); ...@@ -252,7 +244,7 @@ int tqInit();
void tqCleanUp(); void tqCleanUp();
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac); STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
void tqClose(STQ*); void tqClose(STQ*);
// required by vnode // required by vnode
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version);
......
...@@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); } ...@@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); }
void tqCleanUp() { tqPushMgrCleanUp(); } void tqCleanUp() { tqPushMgrCleanUp(); }
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) { STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
STQ* pTq = taosMemoryMalloc(sizeof(STQ)); STQ* pTq = taosMemoryMalloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
...@@ -28,15 +28,16 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe ...@@ -28,15 +28,16 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe
pTq->path = strdup(path); pTq->path = strdup(path);
pTq->pVnode = pVnode; pTq->pVnode = pVnode;
pTq->pWal = pWal; pTq->pWal = pWal;
pTq->pVnodeMeta = pVnodeMeta; #if 0
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0); (FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) { if (pTq->tqMeta == NULL) {
taosMemoryFree(pTq); taosMemoryFree(pTq);
return NULL; return NULL;
} }
#endif
pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
...@@ -104,7 +105,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi ...@@ -104,7 +105,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
return 0; return 0;
} }
int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); } int tqCommit(STQ* pTq) {
// do nothing
/*return tqStorePersist(pTq->tqMeta);*/
return 0;
}
int32_t tqGetTopicHandleSize(const STqTopic* pTopic) { int32_t tqGetTopicHandleSize(const STqTopic* pTopic) {
return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) + return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) +
...@@ -219,10 +224,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu ...@@ -219,10 +224,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu
} }
for (int j = 0; j < TQ_BUFFER_SIZE; j++) { for (int j = 0; j < TQ_BUFFER_SIZE; j++) {
pTopic->buffer.output[j].status = 0; pTopic->buffer.output[j].status = 0;
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pReadHandle,
.meta = pTq->pVnodeMeta, .meta = pTq->pVnode->pMeta,
}; };
pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].pReadHandle = pReadHandle;
pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle);
...@@ -238,6 +243,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -238,6 +243,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
int32_t reqEpoch = pReq->epoch; int32_t reqEpoch = pReq->epoch;
int64_t fetchOffset; int64_t fetchOffset;
// get offset to fetch message
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
fetchOffset = walGetFirstVer(pTq->pWal); fetchOffset = walGetFirstVer(pTq->pWal);
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
...@@ -249,7 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -249,7 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey)); STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
ASSERT(pExec); ASSERT(pExec);
int32_t consumerEpoch = atomic_load_32(&pExec->epoch); int32_t consumerEpoch = atomic_load_32(&pExec->epoch);
...@@ -271,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -271,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
} }
SWalReadHead* pHead; SWalReadHead* pHead;
if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) { if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) {
// TODO: no more log, set timer to wait blocking time // TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and // if data inserted during waiting, launch query and
// response to user // response to user
...@@ -285,41 +291,73 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -285,41 +291,73 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
if (pHead->msgType == TDMT_VND_SUBMIT) { if (pHead->msgType == TDMT_VND_SUBMIT) {
SSubmitReq* pCont = (SSubmitReq*)&pHead->body; SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
qTaskInfo_t task = pExec->task[workerId]; if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
ASSERT(task); qTaskInfo_t task = pExec->task[workerId];
qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); ASSERT(task);
while (1) { qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK);
SSDataBlock* pDataBlock = NULL; while (1) {
uint64_t ts = 0; SSDataBlock* pDataBlock = NULL;
if (qExecTask(task, &pDataBlock, &ts) < 0) { uint64_t ts = 0;
ASSERT(0); if (qExecTask(task, &pDataBlock, &ts) < 0) {
ASSERT(0);
}
if (pDataBlock == NULL) break;
ASSERT(pDataBlock->info.rows != 0);
ASSERT(pDataBlock->info.numOfCols != 0);
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock);
void* buf = taosMemoryCalloc(1, dataStrLen);
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->useconds = ts;
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
pRetrieve->compressed = 0;
pRetrieve->completed = 1;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows);
// TODO enable compress
int32_t actualLen = 0;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
} }
if (pDataBlock == NULL) break; } else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
STqReadHandle* pReader = pExec->pStreamReader[workerId];
ASSERT(pDataBlock->info.rows != 0); tqReadHandleSetMsg(pReader, pCont, 0);
ASSERT(pDataBlock->info.numOfCols != 0); while (tqNextDataBlock(pReader)) {
SSDataBlock block = {0};
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows,
void* buf = taosMemoryCalloc(1, dataStrLen); &block.info.numOfCols) < 0) {
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; ASSERT(0);
pRetrieve->useconds = ts; }
pRetrieve->precision = TSDB_DEFAULT_PRECISION; int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block);
pRetrieve->compressed = 0; void* buf = taosMemoryCalloc(1, dataStrLen);
pRetrieve->completed = 1; SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
pRetrieve->numOfRows = htonl(pDataBlock->info.rows); /*pRetrieve->useconds = 0;*/
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
// TODO enable compress pRetrieve->compressed = 0;
int32_t actualLen = 0; pRetrieve->completed = 1;
blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); pRetrieve->numOfRows = htonl(block.info.rows);
actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); // TODO enable compress
taosArrayPush(rsp.blockDataLen, &actualLen); int32_t actualLen = 0;
taosArrayPush(rsp.blockData, &buf); blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false);
rsp.blockNum++; actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen);
taosArrayPush(rsp.blockDataLen, &actualLen);
taosArrayPush(rsp.blockData, &buf);
rsp.blockNum++;
}
} else {
ASSERT(0);
} }
} }
// TODO batch optimization // TODO batch optimization:
// TODO continue scan until meeting batch requirement
if (rsp.blockNum != 0) break; if (rsp.blockNum != 0) break;
rsp.skipLogNum++; rsp.skipLogNum++;
fetchOffset++; fetchOffset++;
...@@ -572,10 +610,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ...@@ -572,10 +610,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: persist meta into tdb // TODO: persist meta into tdb
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
SMqRebVgReq req; SMqRebVgReq req = {0};
tDecodeSMqRebVgReq(msg, &req); tDecodeSMqRebVgReq(msg, &req);
// todo lock // todo lock
STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey)); STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey));
if (pExec == NULL) { if (pExec == NULL) {
ASSERT(req.oldConsumerId == -1); ASSERT(req.oldConsumerId == -1);
ASSERT(req.newConsumerId != -1); ASSERT(req.newConsumerId != -1);
...@@ -586,19 +624,27 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -586,19 +624,27 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN);
pExec->consumerId = req.newConsumerId; pExec->consumerId = req.newConsumerId;
pExec->epoch = -1; pExec->epoch = -1;
pExec->subType = req.subType;
pExec->withTbName = req.withTbName;
pExec->withSchema = req.withSchema;
pExec->withTag = req.withTag;
pExec->withTagSchema = req.withTagSchema;
pExec->qmsg = req.qmsg; pExec->qmsg = req.qmsg;
req.qmsg = NULL; req.qmsg = NULL;
pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal);
pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal);
for (int32_t i = 0; i < 4; i++) { for (int32_t i = 0; i < 4; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); pExec->pStreamReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pExec->pStreamReader[i],
.meta = pTq->pVnodeMeta, .meta = pTq->pVnode->pMeta,
}; };
pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle);
ASSERT(pExec->task[i]); ASSERT(pExec->task[i]);
} }
taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec));
return 0; return 0;
} else { } else {
/*if (req.newConsumerId != -1) {*/ /*if (req.newConsumerId != -1) {*/
...@@ -627,12 +673,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { ...@@ -627,12 +673,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
return -1; return -1;
} }
for (int32_t i = 0; i < parallel; i++) { for (int32_t i = 0; i < parallel; i++) {
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);
SReadHandle handle = { SReadHandle handle = {
.reader = pReadHandle, .reader = pStreamReader,
.meta = pTq->pVnodeMeta, .meta = pTq->pVnode->pMeta,
}; };
pTask->exec.runners[i].inputHandle = pReadHandle; pTask->exec.runners[i].inputHandle = pStreamReader;
pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
ASSERT(pTask->exec.runners[i].executor); ASSERT(pTask->exec.runners[i].executor);
} }
......
...@@ -82,7 +82,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { ...@@ -82,7 +82,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
return false; return false;
} }
int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) { int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows,
int16_t* pNumOfCols) {
/*int32_t sversion = pHandle->pBlock->sversion;*/ /*int32_t sversion = pHandle->pBlock->sversion;*/
// TODO set to real sversion // TODO set to real sversion
int32_t sversion = 0; int32_t sversion = 0;
...@@ -104,7 +105,6 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -104,7 +105,6 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper;
*pNumOfRows = pHandle->pBlock->numOfRows; *pNumOfRows = pHandle->pBlock->numOfRows;
/*int32_t numOfCols = pHandle->pSchema->numOfCols;*/
int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList);
if (colNumNeed > pSchemaWrapper->nCols) { if (colNumNeed > pSchemaWrapper->nCols) {
...@@ -142,6 +142,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p ...@@ -142,6 +142,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p
} }
int32_t colActual = taosArrayGetSize(*ppCols); int32_t colActual = taosArrayGetSize(*ppCols);
*pNumOfCols = colActual;
// TODO in stream shuffle case, fetch groupId // TODO in stream shuffle case, fetch groupId
*pGroupId = 0; *pGroupId = 0;
......
...@@ -112,7 +112,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { ...@@ -112,7 +112,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
// open tq // open tq
sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR);
pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal);
if (pVnode->pTq == NULL) { if (pVnode->pTq == NULL) {
vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
......
...@@ -560,7 +560,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) ...@@ -560,7 +560,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup)
SArray* pCols = NULL; SArray* pCols = NULL;
uint64_t groupId; uint64_t groupId;
int32_t numOfRows; int32_t numOfRows;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows); int16_t outputCol;
int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows, &outputCol);
if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { if (code != TSDB_CODE_SUCCESS || numOfRows == 0) {
pTaskInfo->code = code; pTaskInfo->code = code;
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include <stdbool.h> #include <stdbool.h>
#include "tmsg.h" #include "tmsg.h"
#include "tcommon.h" #include "tcommon.h"
#include "function.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -118,8 +119,7 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn ...@@ -118,8 +119,7 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn
int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf);
// input: block // input: block
// output: resultData // output: resultData
int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData); int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output);
/** /**
* tearn down udf * tearn down udf
* @param handle * @param handle
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "uv.h" #include "uv.h"
#include "os.h" #include "os.h"
#include "fnLog.h"
#include "tudf.h" #include "tudf.h"
#include "tudfInt.h" #include "tudfInt.h"
#include "tarray.h" #include "tarray.h"
...@@ -557,6 +558,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { ...@@ -557,6 +558,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) {
return 0; return 0;
} }
int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) {
output->info.rows = input->numOfRows;
output->info.numOfCols = numOfCols;
bool hasVarCol = false;
for (int32_t i = 0; i < numOfCols; ++i) {
if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) {
hasVarCol = true;
break;
}
}
output->info.hasVarCol = hasVarCol;
//TODO: free the array output->pDataBlock
output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
taosArrayPush(output->pDataBlock, input->columnData);
return 0;
}
int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) {
if (input->info.numOfCols != 1) {
fnError("scalar function only support one column");
return -1;
}
output->numOfRows = input->info.rows;
//TODO: memory
output->columnData = taosArrayGet(input->pDataBlock, 0);
return 0;
}
void onUdfcPipeClose(uv_handle_t *handle) { void onUdfcPipeClose(uv_handle_t *handle) {
SClientUvConn *conn = handle->data; SClientUvConn *conn = handle->data;
...@@ -1108,11 +1137,13 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn ...@@ -1108,11 +1137,13 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn
return err; return err;
} }
// input: block int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) {
// output: resultData
int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) {
int8_t callType = TSDB_UDF_CALL_SCALA_PROC; int8_t callType = TSDB_UDF_CALL_SCALA_PROC;
int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); SSDataBlock inputBlock = {0};
convertScalarParamToDataBlock(input, numOfCols, &inputBlock);
SSDataBlock resultBlock = {0};
int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL);
convertDataBlockToScalarParm(&resultBlock, output);
return err; return err;
} }
......
...@@ -26,15 +26,15 @@ ...@@ -26,15 +26,15 @@
#include "trpc.h" #include "trpc.h"
typedef struct SUdfdContext { typedef struct SUdfdContext {
uv_loop_t *loop; uv_loop_t *loop;
uv_pipe_t ctrlPipe; uv_pipe_t ctrlPipe;
uv_signal_t intrSignal; uv_signal_t intrSignal;
char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN];
uv_pipe_t listeningPipe; uv_pipe_t listeningPipe;
void *clientRpc; void *clientRpc;
uv_mutex_t udfsMutex; uv_mutex_t udfsMutex;
SHashObj* udfsHash; SHashObj *udfsHash;
bool printVersion; bool printVersion;
} SUdfdContext; } SUdfdContext;
...@@ -55,22 +55,17 @@ typedef struct SUvUdfWork { ...@@ -55,22 +55,17 @@ typedef struct SUvUdfWork {
uv_buf_t output; uv_buf_t output;
} SUvUdfWork; } SUvUdfWork;
typedef enum { typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState;
UDF_STATE_INIT = 0,
UDF_STATE_LOADING,
UDF_STATE_READY,
UDF_STATE_UNLOADING
} EUdfState;
typedef struct SUdf { typedef struct SUdf {
int32_t refCount; int32_t refCount;
EUdfState state; EUdfState state;
uv_mutex_t lock; uv_mutex_t lock;
uv_cond_t condReady; uv_cond_t condReady;
char name[16]; char name[16];
int8_t type; int8_t type;
char path[PATH_MAX]; char path[PATH_MAX];
uv_lib_t lib; uv_lib_t lib;
TUdfScalarProcFunc scalarProcFunc; TUdfScalarProcFunc scalarProcFunc;
...@@ -83,24 +78,28 @@ typedef struct SUdfcFuncHandle { ...@@ -83,24 +78,28 @@ typedef struct SUdfcFuncHandle {
SUdf *udf; SUdf *udf;
} SUdfcFuncHandle; } SUdfcFuncHandle;
int32_t udfdLoadUdf(char* udfName, SUdf* udf) { int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf);
strcpy(udf->name, udfName);
int err = uv_dlopen(udf->path, &udf->lib); int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) {
if (err != 0) { strcpy(udf->name, udfName);
fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
// TODO set error udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf);
}
//TODO: find all the functions int err = uv_dlopen(udf->path, &udf->lib);
char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; if (err != 0) {
strcpy(normalFuncName, udfName); fnError("can not load library %s. error: %s", udf->path, uv_strerror(err));
uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); // TODO set error
char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; }
char *freeSuffix = "_free"; // TODO: find all the functions
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); char normalFuncName[TSDB_FUNC_NAME_LEN] = {0};
strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); strcpy(normalFuncName, udfName);
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc));
return 0; char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0};
char *freeSuffix = "_free";
strncpy(freeFuncName, normalFuncName, strlen(normalFuncName));
strncat(freeFuncName, freeSuffix, strlen(freeSuffix));
uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn));
return 0;
} }
void udfdProcessRequest(uv_work_t *req) { void udfdProcessRequest(uv_work_t *req) {
...@@ -110,13 +109,13 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -110,13 +109,13 @@ void udfdProcessRequest(uv_work_t *req) {
switch (request.type) { switch (request.type) {
case UDF_TASK_SETUP: { case UDF_TASK_SETUP: {
//TODO: tracable id from client. connect, setup, call, teardown // TODO: tracable id from client. connect, setup, call, teardown
fnInfo("%"PRId64" setup request. udf name: %s", request.seqNum, request.setup.udfName); fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName);
SUdfSetupRequest *setup = &request.setup; SUdfSetupRequest *setup = &request.setup;
SUdf* udf = NULL; SUdf *udf = NULL;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
SUdf** udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN);
if (*udfInHash) { if (*udfInHash) {
++(*udfInHash)->refCount; ++(*udfInHash)->refCount;
udf = *udfInHash; udf = *udfInHash;
...@@ -136,7 +135,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -136,7 +135,7 @@ void udfdProcessRequest(uv_work_t *req) {
uv_mutex_lock(&udf->lock); uv_mutex_lock(&udf->lock);
if (udf->state == UDF_STATE_INIT) { if (udf->state == UDF_STATE_INIT) {
udf->state = UDF_STATE_LOADING; udf->state = UDF_STATE_LOADING;
udfdLoadUdf(setup->udfName, udf); udfdLoadUdf(setup->udfName, &setup->epSet, udf);
udf->state = UDF_STATE_READY; udf->state = UDF_STATE_READY;
uv_cond_broadcast(&udf->condReady); uv_cond_broadcast(&udf->condReady);
uv_mutex_unlock(&udf->lock); uv_mutex_unlock(&udf->lock);
...@@ -168,8 +167,9 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -168,8 +167,9 @@ void udfdProcessRequest(uv_work_t *req) {
case UDF_TASK_CALL: { case UDF_TASK_CALL: {
SUdfCallRequest *call = &request.call; SUdfCallRequest *call = &request.call;
fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType,
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); call->udfHandle);
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
SUdfDataBlock input = {0}; SUdfDataBlock input = {0};
...@@ -206,10 +206,10 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -206,10 +206,10 @@ void udfdProcessRequest(uv_work_t *req) {
} }
case UDF_TASK_TEARDOWN: { case UDF_TASK_TEARDOWN: {
SUdfTeardownRequest *teardown = &request.teardown; SUdfTeardownRequest *teardown = &request.teardown;
fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle =
SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); (SUdfcFuncHandle *)(teardown->udfHandle);
SUdf *udf = handle->udf; SUdf *udf = handle->udf;
bool unloadUdf = false; bool unloadUdf = false;
uv_mutex_lock(&global.udfsMutex); uv_mutex_lock(&global.udfsMutex);
udf->refCount--; udf->refCount--;
if (udf->refCount == 0) { if (udf->refCount == 0) {
...@@ -250,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) { ...@@ -250,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) {
void udfdOnWrite(uv_write_t *req, int status) { void udfdOnWrite(uv_write_t *req, int status) {
SUvUdfWork *work = (SUvUdfWork *)req->data; SUvUdfWork *work = (SUvUdfWork *)req->data;
if (status < 0) { if (status < 0) {
//TODO:log error and process it. // TODO:log error and process it.
} }
fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status)); fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status));
taosMemoryFree(work->output.base); taosMemoryFree(work->output.base);
...@@ -393,7 +393,7 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { ...@@ -393,7 +393,7 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) {
void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; }
int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char* udfName, SUdf* udf) { int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf) {
SRetrieveFuncReq retrieveReq = {0}; SRetrieveFuncReq retrieveReq = {0};
retrieveReq.numOfFuncs = 1; retrieveReq.numOfFuncs = 1;
retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN);
...@@ -505,7 +505,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu ...@@ -505,7 +505,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu
void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
if (nread < 0) { if (nread < 0) {
fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); fnError("udfd ctrl pipe read error. %s", uv_err_name(nread));
uv_close((uv_handle_t*)q, NULL); uv_close((uv_handle_t *)q, NULL);
uv_stop(global.loop); uv_stop(global.loop);
return; return;
} }
...@@ -515,13 +515,13 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { ...@@ -515,13 +515,13 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
static int32_t removeListeningPipe() { static int32_t removeListeningPipe() {
uv_fs_t req; uv_fs_t req;
int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL);
uv_fs_req_cleanup(&req); uv_fs_req_cleanup(&req);
return err; return err;
} }
static int32_t udfdUvInit() { static int32_t udfdUvInit() {
uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t));
if (loop) { if (loop) {
uv_loop_init(loop); uv_loop_init(loop);
} }
...@@ -529,10 +529,10 @@ static int32_t udfdUvInit() { ...@@ -529,10 +529,10 @@ static int32_t udfdUvInit() {
uv_pipe_init(global.loop, &global.ctrlPipe, 1); uv_pipe_init(global.loop, &global.ctrlPipe, 1);
uv_pipe_open(&global.ctrlPipe, 0); uv_pipe_open(&global.ctrlPipe, 0);
uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb);
char dnodeId[8] = {0}; char dnodeId[8] = {0};
size_t dnodeIdSize; size_t dnodeIdSize;
int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize);
if (err != 0) { if (err != 0) {
dnodeId[0] = '1'; dnodeId[0] = '1';
...@@ -567,7 +567,7 @@ static int32_t udfdRun() { ...@@ -567,7 +567,7 @@ static int32_t udfdRun() {
global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
uv_mutex_init(&global.udfsMutex); uv_mutex_init(&global.udfsMutex);
//TOOD: client rpc to fetch udf function info from mnode // TOOD: client rpc to fetch udf function info from mnode
if (udfdOpenClientRpc() != 0) { if (udfdOpenClientRpc() != 0) {
fnError("open rpc connection to mnode failure"); fnError("open rpc connection to mnode failure");
return -1; return -1;
...@@ -589,7 +589,7 @@ static int32_t udfdRun() { ...@@ -589,7 +589,7 @@ static int32_t udfdRun() {
return code; return code;
} }
int main(int argc, char* argv[]) { int main(int argc, char *argv[]) {
if (!taosCheckSystemIsSmallEnd()) { if (!taosCheckSystemIsSmallEnd()) {
printf("failed to start since on non-small-end machines\n"); printf("failed to start since on non-small-end machines\n");
return -1; return -1;
......
...@@ -44,12 +44,15 @@ int main(int argc, char *argv[]) { ...@@ -44,12 +44,15 @@ int main(int argc, char *argv[]) {
} }
taosArrayPush(pBlock->pDataBlock, &colInfo); taosArrayPush(pBlock->pDataBlock, &colInfo);
} }
SSDataBlock output = {0};
callUdfScalaProcess(handle, pBlock, &output);
SColumnInfoData *col = taosArrayGet(output.pDataBlock, 0); SScalarParam input = {0};
for (int32_t i = 0; i < output.info.rows; ++i) { input.numOfRows = pBlock->info.rows;
input.columnData = taosArrayGet(pBlock->pDataBlock, 0);
SScalarParam output = {0};
callUdfScalarFunc(handle, &input, 1 , &output);
SColumnInfoData *col = output.columnData;
for (int32_t i = 0; i < output.numOfRows; ++i) {
fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t))); fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t)));
} }
teardownUdf(handle); teardownUdf(handle);
......
...@@ -1326,7 +1326,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32 ...@@ -1326,7 +1326,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32
STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header
tdSRowResetBuf(pBuilder, row); tdSRowResetBuf(pBuilder, row);
// 1. set the parsed value from sql string
for (int c = 0; c < spd->numOfBound; ++c) { for (int c = 0; c < spd->numOfBound; ++c) {
SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1];
...@@ -1408,11 +1407,7 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, ...@@ -1408,11 +1407,7 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf,
} }
SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx] - 1]; SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx] - 1];
if (bind->buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
}
if (bind->num != rowNum) { if (bind->num != rowNum) {
return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same");
} }
...@@ -1427,6 +1422,10 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, ...@@ -1427,6 +1422,10 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf,
CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param)); CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, &param));
} else { } else {
if (bind->buffer_type != pColSchema->type) {
return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
}
int32_t colLen = pColSchema->bytes; int32_t colLen = pColSchema->bytes;
if (IS_VAR_DATA_TYPE(pColSchema->type)) { if (IS_VAR_DATA_TYPE(pColSchema->type)) {
colLen = bind->length[r]; colLen = bind->length[r];
......
...@@ -566,6 +566,9 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { ...@@ -566,6 +566,9 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
TSDB_DATA_TYPE_BLOB == rdt.type) { TSDB_DATA_TYPE_BLOB == rdt.type) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName);
} }
if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) {
((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType;
}
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
} else if (nodesIsJsonOp(pOp)){ } else if (nodesIsJsonOp(pOp)){
......
...@@ -43,7 +43,8 @@ void qwFreeFetchRsp(void *msg); ...@@ -43,7 +43,8 @@ void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp);
int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp);
int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -948,7 +948,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex ...@@ -948,7 +948,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex
DataSinkHandle sinkHandle = NULL; DataSinkHandle sinkHandle = NULL;
SQWTaskCtx *ctx = NULL; SQWTaskCtx *ctx = NULL;
QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo));
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL));
...@@ -1285,23 +1285,51 @@ _return: ...@@ -1285,23 +1285,51 @@ _return:
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
} }
int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0;
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) {
tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
sch->hbConnInfo.handle = NULL;
sch->hbConnInfo.ahandle = NULL;
QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle);
} else {
QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle);
}
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
qwReleaseScheduler(QW_READ, mgmt);
QW_RET(TSDB_CODE_SUCCESS);
}
int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
int32_t code = 0; int32_t code = 0;
SSchedulerHbRsp rsp = {0}; SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL; SQWSchStatus *sch = NULL;
uint64_t seqId = 0;
void *origHandle = NULL;
memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); if (qwMsg->code) {
QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
}
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));
QW_LOCK(QW_WRITE, &sch->hbConnLock); QW_LOCK(QW_WRITE, &sch->hbConnLock);
if (sch->hbConnInfo.handle) { if (sch->hbConnInfo.handle) {
tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER);
} }
memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo));
memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId)); memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId));
...@@ -1314,7 +1342,14 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { ...@@ -1314,7 +1342,14 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
_return: _return:
memcpy(&rsp.epId, &req->epId, sizeof(req->epId));
qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code);
if (code) {
tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER);
}
QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code));
QW_RET(TSDB_CODE_SUCCESS); QW_RET(TSDB_CODE_SUCCESS);
......
...@@ -286,7 +286,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -286,7 +286,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
} }
int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq));
if (NULL == req) { if (NULL == req) {
QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq));
...@@ -313,6 +313,42 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { ...@@ -313,6 +313,42 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) {
SSchedulerHbReq req = {0};
req.header.vgId = mgmt->nodeId;
req.sId = sId;
int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
if (msgSize < 0) {
QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
void *msg = rpcMallocCont(msgSize);
if (NULL == msg) {
QW_SCH_ELOG("calloc %d failed", msgSize);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) {
QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize);
taosMemoryFree(msg);
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}
SRpcMsg pMsg = {
.handle = pConn->handle,
.ahandle = pConn->ahandle,
.msgType = TDMT_VND_QUERY_HEARTBEAT,
.pCont = msg,
.contLen = sizeof(SSchedulerHbReq),
.code = TSDB_CODE_RPC_NETWORK_UNAVAIL,
};
tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
...@@ -587,10 +623,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { ...@@ -587,10 +623,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
} }
uint64_t sId = req.sId; uint64_t sId = req.sId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code};
qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.handle = pMsg->handle;
qwMsg.connInfo.ahandle = pMsg->ahandle; qwMsg.connInfo.ahandle = pMsg->ahandle;
if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));
}
QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle); QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle);
QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req));
......
...@@ -215,8 +215,10 @@ typedef struct SFilterPCtx { ...@@ -215,8 +215,10 @@ typedef struct SFilterPCtx {
} SFilterPCtx; } SFilterPCtx;
typedef struct SFltTreeStat { typedef struct SFltTreeStat {
int32_t code; int32_t code;
bool scalarMode; int8_t precision;
bool scalarMode;
SFilterInfo* info;
} SFltTreeStat; } SFltTreeStat;
typedef struct SFltScalarCtx { typedef struct SFltScalarCtx {
......
...@@ -3364,6 +3364,12 @@ int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool * ...@@ -3364,6 +3364,12 @@ int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool *
filterGetRangeRes(prev, &tra); filterGetRangeRes(prev, &tra);
win->skey = tra.s; win->skey = tra.s;
win->ekey = tra.e; win->ekey = tra.e;
if (FILTER_GET_FLAG(tra.sflag, RANGE_FLG_EXCLUDE)) {
win->skey++;
}
if (FILTER_GET_FLAG(tra.eflag, RANGE_FLG_EXCLUDE)) {
win->ekey--;
}
} }
filterFreeRangeCtx(prev); filterFreeRangeCtx(prev);
...@@ -3492,7 +3498,40 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { ...@@ -3492,7 +3498,40 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { if (QUERY_NODE_VALUE == nodeType(*pNode)) {
if (!FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP)) {
return DEAL_RES_CONTINUE;
}
SValueNode *valueNode = (SValueNode *)*pNode;
if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type) {
return DEAL_RES_CONTINUE;
}
#if 0
if (stat->precision < 0) {
//TODO
return DEAL_RES_CONTINUE;
}
char *timeStr = valueNode->datum.p;
if (taosParseTime(valueNode->datum.p, &valueNode->datum.i, valueNode->node.resType.bytes, stat->precision, tsDaylight) !=
TSDB_CODE_SUCCESS) {
return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal);
}
TODO
#else
return DEAL_RES_CONTINUE;
#endif
}
if (QUERY_NODE_COLUMN == nodeType(*pNode)) {
SColumnNode *colNode = (SColumnNode *)*pNode;
stat->precision = colNode->node.resType.precision;
return DEAL_RES_CONTINUE;
}
if (QUERY_NODE_NODE_LIST == nodeType(*pNode)) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
...@@ -3656,16 +3695,19 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options) ...@@ -3656,16 +3695,19 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options)
info = *pInfo; info = *pInfo;
info->options = options; info->options = options;
SFltTreeStat stat1 = {0}; SFltTreeStat stat = {0};
FLT_ERR_JRET(fltReviseNodes(info, &pNode, &stat1)); stat.precision = -1;
stat.info = info;
FLT_ERR_JRET(fltReviseNodes(info, &pNode, &stat));
info->scalarMode = stat1.scalarMode; info->scalarMode = stat.scalarMode;
if (!info->scalarMode) { if (!info->scalarMode) {
FLT_ERR_JRET(fltInitFromNode(pNode, info, options)); FLT_ERR_JRET(fltInitFromNode(pNode, info, options));
} else { } else {
info->sclCtx.node = pNode; info->sclCtx.node = pNode;
FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat1)); FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat));
} }
return code; return code;
......
...@@ -1123,20 +1123,20 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -1123,20 +1123,20 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData);
} }
char *input[2]; int32_t numOfRows = 0;
for (int32_t k = 0; k < 2; ++k) { for (int32_t i = 0; i < inputNum; ++i) {
int32_t type = GET_PARAM_TYPE(&pInput[k]); if (pInput[i].numOfRows > numOfRows) {
if (type != TSDB_DATA_TYPE_BIGINT && type != TSDB_DATA_TYPE_TIMESTAMP && numOfRows = pInput[i].numOfRows;
type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) {
return TSDB_CODE_FAILED;
} }
} }
for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { char *input[2];
for (int32_t i = 0; i < numOfRows; ++i) {
bool hasNull = false;
for (int32_t k = 0; k < 2; ++k) { for (int32_t k = 0; k < 2; ++k) {
if (colDataIsNull_s(pInput[0].columnData, i)) { if (colDataIsNull_s(pInput[k].columnData, i)) {
colDataAppendNULL(pOutput->columnData, i); hasNull = true;
continue; break;
} }
int32_t rowIdx = (pInput[k].numOfRows == 1) ? 0 : i; int32_t rowIdx = (pInput[k].numOfRows == 1) ? 0 : i;
...@@ -1178,6 +1178,11 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -1178,6 +1178,11 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
} }
} }
if (hasNull) {
colDataAppendNULL(pOutput->columnData, i);
continue;
}
int64_t result = (timeVal[0] >= timeVal[1]) ? (timeVal[0] - timeVal[1]) : int64_t result = (timeVal[0] >= timeVal[1]) ? (timeVal[0] - timeVal[1]) :
(timeVal[1] - timeVal[0]); (timeVal[1] - timeVal[0]);
...@@ -1238,7 +1243,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p ...@@ -1238,7 +1243,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p
colDataAppend(pOutput->columnData, i, (char *)&result, false); colDataAppend(pOutput->columnData, i, (char *)&result, false);
} }
pOutput->numOfRows = pInput->numOfRows; pOutput->numOfRows = numOfRows;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -245,7 +245,7 @@ TEST(timerangeTest, greater) { ...@@ -245,7 +245,7 @@ TEST(timerangeTest, greater) {
int32_t code = filterGetTimeRange(opNode1, &win, &isStrict); int32_t code = filterGetTimeRange(opNode1, &win, &isStrict);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(isStrict, true); ASSERT_EQ(isStrict, true);
ASSERT_EQ(win.skey, tsmall); ASSERT_EQ(win.skey, tsmall+1);
ASSERT_EQ(win.ekey, INT64_MAX); ASSERT_EQ(win.ekey, INT64_MAX);
//filterFreeInfo(filter); //filterFreeInfo(filter);
nodesDestroyNode(opNode1); nodesDestroyNode(opNode1);
...@@ -268,6 +268,37 @@ TEST(timerangeTest, greater_and_lower) { ...@@ -268,6 +268,37 @@ TEST(timerangeTest, greater_and_lower) {
flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2);
//SFilterInfo *filter = NULL;
//int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
//ASSERT_EQ(code, 0);
STimeWindow win = {0};
bool isStrict = false;
int32_t code = filterGetTimeRange(logicNode, &win, &isStrict);
ASSERT_EQ(isStrict, true);
ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall+1);
ASSERT_EQ(win.ekey, tbig-1);
//filterFreeInfo(filter);
nodesDestroyNode(logicNode);
}
TEST(timerangeTest, greater_equal_and_lower_equal) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL;
bool eRes[5] = {false, false, true, true, true};
SScalarParam res = {0};
int64_t tsmall = 222, tbig = 333;
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall);
flttMakeOpNode(&opNode1, OP_TYPE_GREATER_EQUAL, TSDB_DATA_TYPE_BOOL, pcol, pval);
flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL);
flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig);
flttMakeOpNode(&opNode2, OP_TYPE_LOWER_EQUAL, TSDB_DATA_TYPE_BOOL, pcol, pval);
SNode *list[2] = {0};
list[0] = opNode1;
list[1] = opNode2;
flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2);
//SFilterInfo *filter = NULL; //SFilterInfo *filter = NULL;
//int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP); //int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP);
//ASSERT_EQ(code, 0); //ASSERT_EQ(code, 0);
...@@ -282,6 +313,7 @@ TEST(timerangeTest, greater_and_lower) { ...@@ -282,6 +313,7 @@ TEST(timerangeTest, greater_and_lower) {
nodesDestroyNode(logicNode); nodesDestroyNode(logicNode);
} }
TEST(timerangeTest, greater_and_lower_not_strict) { TEST(timerangeTest, greater_and_lower_not_strict) {
SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL; SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL;
bool eRes[5] = {false, false, true, true, true}; bool eRes[5] = {false, false, true, true, true};
...@@ -324,8 +356,8 @@ TEST(timerangeTest, greater_and_lower_not_strict) { ...@@ -324,8 +356,8 @@ TEST(timerangeTest, greater_and_lower_not_strict) {
int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict); int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict);
ASSERT_EQ(isStrict, false); ASSERT_EQ(isStrict, false);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
ASSERT_EQ(win.skey, tsmall1); ASSERT_EQ(win.skey, tsmall1+1);
ASSERT_EQ(win.ekey, tbig2); ASSERT_EQ(win.ekey, tbig2-1);
//filterFreeInfo(filter); //filterFreeInfo(filter);
nodesDestroyNode(logicNode1); nodesDestroyNode(logicNode1);
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册