diff --git a/example/src/tmq.c b/example/src/tmq.c index 88fce7f4be59f6d73b2fcebb652d1c1fa0d003bb..46f57799e7499dc77d1a9e1a5cb0f7a75d1e6309 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -103,6 +103,7 @@ int32_t create_topic() { /*const char* sql = "select * from tu1";*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));*/ + /*pRes = taos_query(pConn, "create topic topic_ctb_column as abc1");*/ pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from ct1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); diff --git a/include/client/taos.h b/include/client/taos.h index 55deee4fad2970b6db87a7763ba200c953f04701..2ace01f95ebe2845a47e35b2d9f6508b98bf3128 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -216,7 +216,6 @@ typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_list_t tmq_list_t; -// typedef struct tmq_message_t tmq_message_t; typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *)); @@ -262,12 +261,6 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); -#if 0 -// temporary used function for demo only -void tmqShowMsg(tmq_message_t *tmq_message); -int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); -#endif - /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); @@ -278,12 +271,8 @@ DLL_EXPORT char *tmq_get_block_table_name(TAOS_RES *res); #endif #if 0 -DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); -DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic); -DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic); -DLL_EXPORT void tmq_message_destroy(TAOS_RES *res); #endif /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ #if 0 diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index f61073c9a5c31ba92752dcec8071a50c78b6999d..b88afcb39df49c518b41968eee74638ca2143ae1 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -213,7 +213,6 @@ int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, void * pTransporter, const */ int32_t catalogGetAllMeta(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); - int32_t catalogGetQnodeList(SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, SArray* pQnodeList); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 14c7e9533d66feef63163758c2685934b4d45833..3a1ba4cebc071bb52a023f67eeca26d6c8772b3a 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -24,16 +24,6 @@ #include "tqueue.h" #include "tref.h" -#if 0 -struct tmq_message_t { - SMqPollRsp msg; - char* topic; - SArray* res; // SArray - int32_t vgId; - int32_t resIter; -}; -#endif - typedef struct { int8_t tmqRspType; int32_t epoch; @@ -770,105 +760,12 @@ _return: } #endif -static char* formatTimestamp(char* buf, int64_t val, int precision) { - time_t tt; - int32_t ms = 0; - if (precision == TSDB_TIME_PRECISION_NANO) { - tt = (time_t)(val / 1000000000); - ms = val % 1000000000; - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - tt = (time_t)(val / 1000000); - ms = val % 1000000; - } else { - tt = (time_t)(val / 1000); - ms = val % 1000; - } - - /* comment out as it make testcases like select_with_tags.sim fail. - but in windows, this may cause the call to localtime crash if tt < 0, - need to find a better solution. - if (tt < 0) { - tt = 0; - } - */ - -#ifdef WINDOWS - if (tt < 0) tt = 0; -#endif - if (tt <= 0 && ms < 0) { - tt--; - if (precision == TSDB_TIME_PRECISION_NANO) { - ms += 1000000000; - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - ms += 1000000; - } else { - ms += 1000; - } - } - - struct tm* ptm = taosLocalTime(&tt, NULL); - size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm); - - if (precision == TSDB_TIME_PRECISION_NANO) { - sprintf(buf + pos, ".%09d", ms); - } else if (precision == TSDB_TIME_PRECISION_MICRO) { - sprintf(buf + pos, ".%06d", ms); - } else { - sprintf(buf + pos, ".%03d", ms); - } - - return buf; -} #if 0 int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; SMqPollRsp* pRsp = &tmq_message->msg; return pRsp->skipLogNum; } - -void tmqShowMsg(tmq_message_t* tmq_message) { - if (tmq_message == NULL) return; - - static bool noPrintSchema; - char pBuf[128]; - SMqPollRsp* pRsp = &tmq_message->msg; - int32_t colNum = 2; - if (!noPrintSchema) { - printf("|"); - for (int32_t i = 0; i < colNum; i++) { - if (i == 0) - printf(" %25s |", pRsp->schema->pSchema[i].name); - else - printf(" %15s |", pRsp->schema->pSchema[i].name); - } - printf("\n"); - printf("===============================================\n"); - noPrintSchema = true; - } - int32_t sz = taosArrayGetSize(pRsp->pBlockData); - for (int32_t i = 0; i < sz; i++) { - SSDataBlock* pDataBlock = taosArrayGet(pRsp->pBlockData, i); - int32_t rows = pDataBlock->info.rows; - for (int32_t j = 0; j < rows; j++) { - printf("|"); - for (int32_t k = 0; k < colNum; k++) { - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); - void* var = POINTER_SHIFT(pColInfoData->pData, j * pColInfoData->info.bytes); - switch (pColInfoData->info.type) { - case TSDB_DATA_TYPE_TIMESTAMP: - formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); - printf(" %25s |", pBuf); - break; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: - printf(" %15u |", *(uint32_t*)var); - break; - } - } - printf("\n"); - } - } -} #endif int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { @@ -1049,7 +946,6 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } tDeleteSMqCMGetSubEpRsp(&rsp); } else { - /*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/ SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); if (pWrapper == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1208,7 +1104,6 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { pRspObj->resIter = -1; memcpy(&pRspObj->rsp, &pWrapper->msg, sizeof(SMqDataBlkRsp)); - /*SRetrieveTableRsp* pRetrieve = taosArrayGetP(pWrapper->msg.blockData, 0);*/ pRspObj->resInfo.totalRows = 0; pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI; setResSchemaInfo(&pRspObj->resInfo, pWrapper->topicHandle->schema.pSchema, pWrapper->topicHandle->schema.nCols); @@ -1355,31 +1250,6 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { } } -#if 0 -tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* rspMsg = NULL; - int64_t startTime = taosGetTimestampMs(); - - int64_t status = atomic_load_64(&tmq->status); - tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); - - while (1) { - rspMsg = tmqSyncPollImpl(tmq, blocking_time); - if (rspMsg && rspMsg->consumeRsp.numOfTopics) { - return rspMsg; - } - - if (blocking_time != 0) { - int64_t endTime = taosGetTimestampMs(); - if (endTime - startTime > blocking_time) { - return NULL; - } - } else - return NULL; - } -} -#endif - TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { SMqRspObj* rspObj; int64_t startTime = taosGetTimestampMs(); @@ -1417,137 +1287,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } } -#if 0 - - if (blocking_time <= 0) blocking_time = 1; - if (blocking_time > 1000) blocking_time = 1000; - /*blocking_time = 1;*/ - - if (taosArrayGetSize(tmq->clientTopics) == 0) { - tscDebug("consumer:%ld poll but not assigned", tmq->consumerId); - /*printf("over1\n");*/ - taosMsleep(blocking_time); - return NULL; - } - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx); - if (taosArrayGetSize(pTopic->vgs) == 0) { - /*printf("over2\n");*/ - taosMsleep(blocking_time); - return NULL; - } - - tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); - int32_t beginVgIdx = pTopic->nextVgIdx; - while (1) { - pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); - /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blocking_time, pTopic, pVg); - if (pReq == NULL) { - ASSERT(false); - taosMsleep(blocking_time); - return NULL; - } - - SMqPollCbParam* param = taosMemoryMalloc(sizeof(SMqPollCbParam)); - if (param == NULL) { - ASSERT(false); - taosMsleep(blocking_time); - return NULL; - } - param->tmq = tmq; - param->retMsg = &tmq_message; - param->pVg = pVg; - tsem_init(¶m->rspSem, 0, 0); - - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){ - .pData = pReq, - .len = sizeof(SMqConsumeReq), - .handle = NULL, - }; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = param; - sendInfo->fp = tmqPollCb; - - /*printf("req offset: %ld\n", pReq->offset);*/ - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - tmq->pollCnt++; - - tsem_wait(¶m->rspSem); - tsem_destroy(¶m->rspSem); - taosMemoryFree(param); - - if (tmq_message == NULL) { - if (beginVgIdx == pTopic->nextVgIdx) { - taosMsleep(blocking_time); - } else { - continue; - } - } - - return tmq_message; - } - - /*tsem_wait(&pRequest->body.rspSem);*/ - - /*if (body != NULL) {*/ - /*destroySendMsgInfo(body);*/ - /*}*/ - - /*if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {*/ - /*pRequest->code = terrno;*/ - /*}*/ - - /*return pRequest;*/ -} -#endif - -#if 0 -tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { - if (tmq_topic_vgroup_list != NULL) { - // TODO - } - - // TODO: change semaphore to gate - for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { - SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); - for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, pTopic, pVg); - - SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); - pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; - SMqCommitCbParam* pParam = taosMemoryMalloc(sizeof(SMqCommitCbParam)); - if (pParam == NULL) { - continue; - } - pParam->tmq = tmq; - pParam->pVg = pVg; - pParam->async = async; - if (!async) tsem_init(&pParam->rspSem, 0, 0); - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - sendInfo->requestObjRefId = 0; - sendInfo->param = pParam; - sendInfo->fp = tmqCommitCb; - - int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - - if (!async) tsem_wait(&pParam->rspSem); - } - } - - return 0; +tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { + // TODO + return TMQ_RESP_ERR__SUCCESS; } -#endif - -tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } const char* tmq_err2str(tmq_resp_err_t err) { if (err == TMQ_RESP_ERR__SUCCESS) { @@ -1573,10 +1316,3 @@ int32_t tmq_get_vgroup_id(TAOS_RES* res) { return -1; } } - -void tmq_message_destroy(TAOS_RES* res) { - if (res == NULL) return; - if (TD_RES_TMQ(res)) { - SMqRspObj* pRspObj = (SMqRspObj*)res; - } -} diff --git a/source/common/src/ttime.c b/source/common/src/ttime.c index 2f3677330bafaaa44bf25e538a2ce4018fd962d6..43554aed8b231d79a7f98a9c0cc464681471eccc 100644 --- a/source/common/src/ttime.c +++ b/source/common/src/ttime.c @@ -415,7 +415,7 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec if (type == TSDB_DATA_TYPE_BINARY) { newColData = taosMemoryCalloc(1, charLen + 1); memcpy(newColData, varDataVal(inputData), charLen); - bool ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, 0); + bool ret = taosParseTime(newColData, timeVal, charLen, (int32_t)timePrec, tsDaylight); if (ret != TSDB_CODE_SUCCESS) { taosMemoryFree(newColData); return ret; @@ -429,7 +429,7 @@ int32_t convertStringToTimestamp(int16_t type, char *inputData, int64_t timePrec return TSDB_CODE_FAILED; } newColData[len] = 0; - bool ret = taosParseTime(newColData, timeVal, len + 1, (int32_t)timePrec, 0); + bool ret = taosParseTime(newColData, timeVal, len + 1, (int32_t)timePrec, tsDaylight); if (ret != TSDB_CODE_SUCCESS) { taosMemoryFree(newColData); return ret; diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 376f589acdb64e3abd716aa21dae843b28eedeb8..122c121ed8a073d4a51b060dc8abc0b115c9664c 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -307,11 +307,11 @@ int32_t dmStartUdfd(SDnode *pDnode) { dInfo("dnode-mgmt start udfd already called"); return 0; } + pData->startCalled = true; uv_barrier_init(&pData->barrier, 2); pData->stopping = 0; uv_thread_create(&pData->thread, dmWatchUdfd, pDnode); uv_barrier_wait(&pData->barrier); - pData->startCalled = true; pData->needCleanUp = true; return pData->spawnErr; } diff --git a/source/dnode/mnode/impl/src/mndPerfSchema.c b/source/dnode/mnode/impl/src/mndPerfSchema.c index 38e9e792286d41bfcc53f5559df0ade2c264de3b..fe96bcb709dcfd7fc465a6d8a1594f490e32cba4 100644 --- a/source/dnode/mnode/impl/src/mndPerfSchema.c +++ b/source/dnode/mnode/impl/src/mndPerfSchema.c @@ -41,10 +41,10 @@ static const SPerfsTableSchema queriesSchema[] = { static const SPerfsTableSchema topicSchema[] = { {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, + /*{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY},*/ {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + /*{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},*/ }; static const SPerfsTableSchema consumerSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 58e5b6c65b87ae037e00b32403581b5e5f57734a..1062154e023432f0af070d12efa59ec725a431eb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -59,7 +59,7 @@ int32_t mndInitStream(SMnode *pMnode) { /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveStream); - mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream); + /*mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextStream);*/ return sdbSetTable(pMnode->pSdb, table); } @@ -247,7 +247,8 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64 return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, int8_t triggerType, int64_t watermark, + STrans *pTrans) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 22b1b404bb5cb51475776e246a85d36ccfb39d64..f17b8a4d88e32fbb3ba2bbe67abd6c5efd0a6361 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -35,7 +35,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj static int32_t mndProcessCreateTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicReq(SNodeMsg *pReq); static int32_t mndProcessDropTopicInRsp(SNodeMsg *pRsp); -static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows); +static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter); int32_t mndInitTopic(SMnode *pMnode) { @@ -51,7 +51,7 @@ int32_t mndInitTopic(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_DROP_TOPIC, mndProcessDropTopicReq); mndSetMsgHandle(pMnode, TDMT_VND_DROP_TOPIC_RSP, mndProcessDropTopicInRsp); - // mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndRetrieveTopic); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_TOPICS, mndCancelGetNextTopic); return sdbSetTable(pMnode->pSdb, table); @@ -511,56 +511,40 @@ static int32_t mndGetNumOfTopics(SMnode *pMnode, char *dbName, int32_t *pNumOfTo return 0; } -static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, char *data, int32_t rows) { +static int32_t mndRetrieveTopic(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) { SMnode *pMnode = pReq->pNode; SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SMqTopicObj *pTopic = NULL; - int32_t cols = 0; - char *pWrite; - char prefix[TSDB_DB_FNAME_LEN] = {0}; - SDbObj *pDb = mndAcquireDb(pMnode, pShow->db); - if (pDb == NULL) return 0; - - tstrncpy(prefix, pShow->db, TSDB_DB_FNAME_LEN); - strcat(prefix, TS_PATH_DELIMITER); - int32_t prefixLen = (int32_t)strlen(prefix); - - while (numOfRows < rows) { + while (numOfRows < rowsCapacity) { pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pTopic); if (pShow->pIter == NULL) break; - if (pTopic->dbUid != pDb->uid) { - if (strncmp(pTopic->name, prefix, prefixLen) != 0) { - mError("Inconsistent topic data, name:%s, db:%s, dbUid:%" PRIu64, pTopic->name, pDb->name, pDb->uid); - } + int32_t cols = 0; - sdbRelease(pSdb, pTopic); - continue; - } + char topicName[TSDB_TOPIC_NAME_LEN + VARSTR_HEADER_SIZE] = {0}; + tstrncpy(&topicName[VARSTR_HEADER_SIZE], pTopic->name, TSDB_TOPIC_NAME_LEN); + varDataSetLen(topicName, strlen(&topicName[VARSTR_HEADER_SIZE])); - cols = 0; + SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)topicName, false); - char topicName[TSDB_TOPIC_NAME_LEN] = {0}; - tstrncpy(topicName, pTopic->name + prefixLen, TSDB_TOPIC_NAME_LEN); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, topicName); - cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, numOfRows, (const char *)&pTopic->createTime, false); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - *(int64_t *)pWrite = pTopic->createTime; - cols++; + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + char *sql = taosMemoryCalloc(1, strlen(pTopic->sql) + 1 + VARSTR_HEADER_SIZE); + strcpy(&sql[VARSTR_HEADER_SIZE], pTopic->sql); + varDataSetLen(sql, strlen(&sql[VARSTR_HEADER_SIZE])); + colDataAppend(pColInfo, numOfRows, (const char *)sql, false); - pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pTopic->sql, pShow->bytes[cols]); - cols++; + taosMemoryFree(sql); numOfRows++; sdbRelease(pSdb, pTopic); } - mndReleaseDb(pMnode, pDb); pShow->numOfRows += numOfRows; return numOfRows; } diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2b52d333dad3193841ee107c36ccf2e5a68438f3..a6e7b37b73c2ec250b45647152bc3cedf3fcb724 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -109,7 +109,8 @@ int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); bool tqNextDataBlock(STqReadHandle *pHandle); -int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows); +int32_t tqRetrieveDataBlock(SArray **ppCols, STqReadHandle *pHandle, uint64_t *pGroupId, int32_t *pNumOfRows, + int16_t *pNumOfCols); // need to reposition diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 5e074eeb12826a65ee01298ac96c681563c9f8c2..6f929ce829f3d00715a128cfb208ab1af7a0ea0a 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,22 +88,12 @@ struct STqReadHandle { SSubmitMsgIter msgIter; SSubmitBlkIter blkIter; SMeta* pVnodeMeta; - SArray* pColIdList; // SArray + SArray* pColIdList; // SArray int32_t sver; SSchemaWrapper* pSchemaWrapper; STSchema* pSchema; }; -typedef struct { - int8_t type; - int8_t reserved[7]; - union { - void* data; - int64_t wmTs; - int64_t checkpointId; - }; -} STqStreamToken; - typedef struct { int16_t ver; int16_t action; @@ -155,24 +145,26 @@ typedef struct { char subKey[TSDB_SUBSCRIBE_KEY_LEN]; int64_t consumerId; int32_t epoch; + int8_t subType; + int8_t withTbName; + int8_t withSchema; + int8_t withTag; + int8_t withTagSchema; char* qmsg; // SRWLatch lock; - SWalReadHandle* pReadHandle; + SWalReadHandle* pWalReader; // number should be identical to fetch thread num - qTaskInfo_t task[4]; + STqReadHandle* pStreamReader[4]; + qTaskInfo_t task[4]; } STqExec; struct STQ { - // the collection of groups - // the handle of meta kvstore - bool writeTrigger; - char* path; - STqMetaStore* tqMeta; - SHashObj* tqMetaNew; // subKey -> tqExec - SHashObj* pStreamTasks; - SVnode* pVnode; - SWal* pWal; - SMeta* pVnodeMeta; + char* path; + // STqMetaStore* tqMeta; + SHashObj* execs; // subKey -> tqExec + SHashObj* pStreamTasks; + SVnode* pVnode; + SWal* pWal; }; typedef struct { @@ -252,7 +244,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); void tqClose(STQ*); // required by vnode int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 510dd324599e44858304c3707f5b1005d1b77a2b..30f75218b42252bca478a9e30c6ad996d273ea00 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -19,7 +19,7 @@ int32_t tqInit() { return tqPushMgrInit(); } void tqCleanUp() { tqPushMgrCleanUp(); } -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) { STQ* pTq = taosMemoryMalloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -28,15 +28,16 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, SMe pTq->path = strdup(path); pTq->pVnode = pVnode; pTq->pWal = pWal; - pTq->pVnodeMeta = pVnodeMeta; +#if 0 pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); if (pTq->tqMeta == NULL) { taosMemoryFree(pTq); return NULL; } +#endif - pTq->tqMetaNew = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); + pTq->execs = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); @@ -104,7 +105,11 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi return 0; } -int tqCommit(STQ* pTq) { return tqStorePersist(pTq->tqMeta); } +int tqCommit(STQ* pTq) { + // do nothing + /*return tqStorePersist(pTq->tqMeta);*/ + return 0; +} int32_t tqGetTopicHandleSize(const STqTopic* pTopic) { return strlen(pTopic->topicName) + strlen(pTopic->sql) + strlen(pTopic->physicalPlan) + strlen(pTopic->qmsg) + @@ -219,10 +224,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu } for (int j = 0; j < TQ_BUFFER_SIZE; j++) { pTopic->buffer.output[j].status = 0; - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { .reader = pReadHandle, - .meta = pTq->pVnodeMeta, + .meta = pTq->pVnode->pMeta, }; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); @@ -238,6 +243,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t reqEpoch = pReq->epoch; int64_t fetchOffset; + // get offset to fetch message if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = walGetFirstVer(pTq->pWal); } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { @@ -249,7 +255,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); - STqExec* pExec = taosHashGet(pTq->tqMetaNew, pReq->subKey, strlen(pReq->subKey)); + STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey)); ASSERT(pExec); int32_t consumerEpoch = atomic_load_32(&pExec->epoch); @@ -271,7 +277,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } SWalReadHead* pHead; - if (walReadWithHandle_s(pExec->pReadHandle, fetchOffset, &pHead) < 0) { + if (walReadWithHandle_s(pExec->pWalReader, fetchOffset, &pHead) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user @@ -285,41 +291,73 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; - qTaskInfo_t task = pExec->task[workerId]; - ASSERT(task); - qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); - while (1) { - SSDataBlock* pDataBlock = NULL; - uint64_t ts = 0; - if (qExecTask(task, &pDataBlock, &ts) < 0) { - ASSERT(0); + if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { + qTaskInfo_t task = pExec->task[workerId]; + ASSERT(task); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts = 0; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(0); + } + if (pDataBlock == NULL) break; + + ASSERT(pDataBlock->info.rows != 0); + ASSERT(pDataBlock->info.numOfCols != 0); + + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = ts; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(pDataBlock->info.rows); + + // TODO enable compress + int32_t actualLen = 0; + blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(rsp.blockDataLen, &actualLen); + taosArrayPush(rsp.blockData, &buf); + rsp.blockNum++; } - if (pDataBlock == NULL) break; - - ASSERT(pDataBlock->info.rows != 0); - ASSERT(pDataBlock->info.numOfCols != 0); - - int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pDataBlock); - void* buf = taosMemoryCalloc(1, dataStrLen); - SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; - pRetrieve->useconds = ts; - pRetrieve->precision = TSDB_DEFAULT_PRECISION; - pRetrieve->compressed = 0; - pRetrieve->completed = 1; - pRetrieve->numOfRows = htonl(pDataBlock->info.rows); - - // TODO enable compress - int32_t actualLen = 0; - blockCompressEncode(pDataBlock, pRetrieve->data, &actualLen, pDataBlock->info.numOfCols, false); - actualLen += sizeof(SRetrieveTableRsp); - ASSERT(actualLen <= dataStrLen); - taosArrayPush(rsp.blockDataLen, &actualLen); - taosArrayPush(rsp.blockData, &buf); - rsp.blockNum++; + } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { + STqReadHandle* pReader = pExec->pStreamReader[workerId]; + tqReadHandleSetMsg(pReader, pCont, 0); + while (tqNextDataBlock(pReader)) { + SSDataBlock block = {0}; + if (tqRetrieveDataBlock(&block.pDataBlock, pReader, &block.info.groupId, &block.info.rows, + &block.info.numOfCols) < 0) { + ASSERT(0); + } + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(&block); + void* buf = taosMemoryCalloc(1, dataStrLen); + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + /*pRetrieve->useconds = 0;*/ + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(block.info.rows); + + // TODO enable compress + int32_t actualLen = 0; + blockCompressEncode(&block, pRetrieve->data, &actualLen, block.info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(rsp.blockDataLen, &actualLen); + taosArrayPush(rsp.blockData, &buf); + rsp.blockNum++; + } + } else { + ASSERT(0); } } - // TODO batch optimization + // TODO batch optimization: + // TODO continue scan until meeting batch requirement if (rsp.blockNum != 0) break; rsp.skipLogNum++; fetchOffset++; @@ -572,10 +610,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: persist meta into tdb int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { - SMqRebVgReq req; + SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); // todo lock - STqExec* pExec = taosHashGet(pTq->tqMetaNew, req.subKey, strlen(req.subKey)); + STqExec* pExec = taosHashGet(pTq->execs, req.subKey, strlen(req.subKey)); if (pExec == NULL) { ASSERT(req.oldConsumerId == -1); ASSERT(req.newConsumerId != -1); @@ -586,19 +624,27 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { memcpy(pExec->subKey, req.subKey, TSDB_SUBSCRIBE_KEY_LEN); pExec->consumerId = req.newConsumerId; pExec->epoch = -1; + + pExec->subType = req.subType; + pExec->withTbName = req.withTbName; + pExec->withSchema = req.withSchema; + pExec->withTag = req.withTag; + pExec->withTagSchema = req.withTagSchema; + pExec->qmsg = req.qmsg; req.qmsg = NULL; - pExec->pReadHandle = walOpenReadHandle(pTq->pVnode->pWal); + + pExec->pWalReader = walOpenReadHandle(pTq->pVnode->pWal); for (int32_t i = 0; i < 4; i++) { - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); - SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnodeMeta, + pExec->pStreamReader[i] = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); + SReadHandle handle = { + .reader = pExec->pStreamReader[i], + .meta = pTq->pVnode->pMeta, }; pExec->task[i] = qCreateStreamExecTaskInfo(pExec->qmsg, &handle); ASSERT(pExec->task[i]); } - taosHashPut(pTq->tqMetaNew, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); + taosHashPut(pTq->execs, req.subKey, strlen(req.subKey), pExec, sizeof(STqExec)); return 0; } else { /*if (req.newConsumerId != -1) {*/ @@ -627,12 +673,12 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) { return -1; } for (int32_t i = 0; i < parallel; i++) { - STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); + STqReadHandle* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); SReadHandle handle = { - .reader = pReadHandle, - .meta = pTq->pVnodeMeta, + .reader = pStreamReader, + .meta = pTq->pVnode->pMeta, }; - pTask->exec.runners[i].inputHandle = pReadHandle; + pTask->exec.runners[i].inputHandle = pStreamReader; pTask->exec.runners[i].executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); ASSERT(pTask->exec.runners[i].executor); } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 02ce6c4aad299c1c569b76a581691bbb91bd1a49..eb45577e0a491ab4ebeb02752f5c07edc3d3bcde 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -82,7 +82,8 @@ bool tqNextDataBlock(STqReadHandle* pHandle) { return false; } -int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows) { +int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* pGroupId, int32_t* pNumOfRows, + int16_t* pNumOfCols) { /*int32_t sversion = pHandle->pBlock->sversion;*/ // TODO set to real sversion int32_t sversion = 0; @@ -104,7 +105,6 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p SSchemaWrapper* pSchemaWrapper = pHandle->pSchemaWrapper; *pNumOfRows = pHandle->pBlock->numOfRows; - /*int32_t numOfCols = pHandle->pSchema->numOfCols;*/ int32_t colNumNeed = taosArrayGetSize(pHandle->pColIdList); if (colNumNeed > pSchemaWrapper->nCols) { @@ -142,6 +142,7 @@ int32_t tqRetrieveDataBlock(SArray** ppCols, STqReadHandle* pHandle, uint64_t* p } int32_t colActual = taosArrayGetSize(*ppCols); + *pNumOfCols = colActual; // TODO in stream shuffle case, fetch groupId *pGroupId = 0; diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 9e4aa714e29ded8854c0d2602561e14162acbb0b..44af83791a642ce31b4541119671626f71ce8625 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -112,7 +112,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { // open tq sprintf(tdir, "%s%s%s", dir, TD_DIRSEP, VNODE_TQ_DIR); - pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal, pVnode->pMeta, vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(tdir, pVnode, pVnode->pWal); if (pVnode->pTq == NULL) { vError("vgId: %d failed to open vnode tq since %s", TD_VID(pVnode), tstrerror(terrno)); goto _err; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce3de02c9fb4685f628374ee53c5f0baf096b9b5..3a9742d48aecd515c360bfbad7324f16fbdfdc42 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -560,7 +560,8 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) SArray* pCols = NULL; uint64_t groupId; int32_t numOfRows; - int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows); + int16_t outputCol; + int32_t code = tqRetrieveDataBlock(&pCols, pInfo->readerHandle, &groupId, &numOfRows, &outputCol); if (code != TSDB_CODE_SUCCESS || numOfRows == 0) { pTaskInfo->code = code; diff --git a/source/libs/function/inc/tudf.h b/source/libs/function/inc/tudf.h index b5c839c811314333f6de390124be05ad02637ae9..5f4f96c4ccc93b84036bb6ae14c1853231b92e10 100644 --- a/source/libs/function/inc/tudf.h +++ b/source/libs/function/inc/tudf.h @@ -21,6 +21,7 @@ #include #include "tmsg.h" #include "tcommon.h" +#include "function.h" #ifdef __cplusplus extern "C" { @@ -118,8 +119,7 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn int32_t callUdfAggMerge(UdfcFuncHandle handle, SUdfInterBuf *interBuf1, SUdfInterBuf *interBuf2, SUdfInterBuf *resultBuf); // input: block // output: resultData -int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData); - +int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam *output); /** * tearn down udf * @param handle diff --git a/source/libs/function/src/tudf.c b/source/libs/function/src/tudf.c index e31a860e85fb8a8ec6c58e605853eb0183f27e2d..317339af04e6f93e16d29b43c1d18bfbaf0db1de 100644 --- a/source/libs/function/src/tudf.c +++ b/source/libs/function/src/tudf.c @@ -14,6 +14,7 @@ */ #include "uv.h" #include "os.h" +#include "fnLog.h" #include "tudf.h" #include "tudfInt.h" #include "tarray.h" @@ -557,6 +558,34 @@ int32_t convertUdfColumnToDataBlock(SUdfColumn *udfCol, SSDataBlock *block) { return 0; } +int32_t convertScalarParamToDataBlock(SScalarParam *input, int32_t numOfCols, SSDataBlock *output) { + output->info.rows = input->numOfRows; + output->info.numOfCols = numOfCols; + bool hasVarCol = false; + for (int32_t i = 0; i < numOfCols; ++i) { + if (IS_VAR_DATA_TYPE((input+i)->columnData->info.type)) { + hasVarCol = true; + break; + } + } + output->info.hasVarCol = hasVarCol; + + //TODO: free the array output->pDataBlock + output->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + taosArrayPush(output->pDataBlock, input->columnData); + return 0; +} + +int32_t convertDataBlockToScalarParm(SSDataBlock *input, SScalarParam *output) { + if (input->info.numOfCols != 1) { + fnError("scalar function only support one column"); + return -1; + } + output->numOfRows = input->info.rows; + //TODO: memory + output->columnData = taosArrayGet(input->pDataBlock, 0); + return 0; +} void onUdfcPipeClose(uv_handle_t *handle) { SClientUvConn *conn = handle->data; @@ -1108,11 +1137,13 @@ int32_t callUdfAggFinalize(UdfcFuncHandle handle, SUdfInterBuf *interBuf, SUdfIn return err; } -// input: block -// output: resultData -int32_t callUdfScalaProcess(UdfcFuncHandle handle, SSDataBlock *block, SSDataBlock *resultData) { +int32_t callUdfScalarFunc(UdfcFuncHandle handle, SScalarParam *input, int32_t numOfCols, SScalarParam* output) { int8_t callType = TSDB_UDF_CALL_SCALA_PROC; - int32_t err = callUdf(handle, callType, block, NULL, NULL, resultData, NULL); + SSDataBlock inputBlock = {0}; + convertScalarParamToDataBlock(input, numOfCols, &inputBlock); + SSDataBlock resultBlock = {0}; + int32_t err = callUdf(handle, callType, &inputBlock, NULL, NULL, &resultBlock, NULL); + convertDataBlockToScalarParm(&resultBlock, output); return err; } diff --git a/source/libs/function/src/udfd.c b/source/libs/function/src/udfd.c index d6e7a436660d4f4fc5d8dc33fd5de6d7c7eaa9c6..5f7532da87c1ceeb313a6eefe104219e0b3690d9 100644 --- a/source/libs/function/src/udfd.c +++ b/source/libs/function/src/udfd.c @@ -26,15 +26,15 @@ #include "trpc.h" typedef struct SUdfdContext { - uv_loop_t *loop; - uv_pipe_t ctrlPipe; + uv_loop_t *loop; + uv_pipe_t ctrlPipe; uv_signal_t intrSignal; - char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; - uv_pipe_t listeningPipe; - void *clientRpc; + char listenPipeName[UDF_LISTEN_PIPE_NAME_LEN]; + uv_pipe_t listeningPipe; + void *clientRpc; uv_mutex_t udfsMutex; - SHashObj* udfsHash; + SHashObj *udfsHash; bool printVersion; } SUdfdContext; @@ -55,22 +55,17 @@ typedef struct SUvUdfWork { uv_buf_t output; } SUvUdfWork; -typedef enum { - UDF_STATE_INIT = 0, - UDF_STATE_LOADING, - UDF_STATE_READY, - UDF_STATE_UNLOADING -} EUdfState; +typedef enum { UDF_STATE_INIT = 0, UDF_STATE_LOADING, UDF_STATE_READY, UDF_STATE_UNLOADING } EUdfState; typedef struct SUdf { - int32_t refCount; - EUdfState state; + int32_t refCount; + EUdfState state; uv_mutex_t lock; - uv_cond_t condReady; + uv_cond_t condReady; char name[16]; int8_t type; - char path[PATH_MAX]; + char path[PATH_MAX]; uv_lib_t lib; TUdfScalarProcFunc scalarProcFunc; @@ -83,24 +78,28 @@ typedef struct SUdfcFuncHandle { SUdf *udf; } SUdfcFuncHandle; -int32_t udfdLoadUdf(char* udfName, SUdf* udf) { - strcpy(udf->name, udfName); +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf); - int err = uv_dlopen(udf->path, &udf->lib); - if (err != 0) { - fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); - // TODO set error - } - //TODO: find all the functions - char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; - strcpy(normalFuncName, udfName); - uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); - char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; - char *freeSuffix = "_free"; - strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); - strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); - uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); - return 0; +int32_t udfdLoadUdf(char *udfName, SEpSet *pEpSet, SUdf *udf) { + strcpy(udf->name, udfName); + + udfdFillUdfInfoFromMNode(global.clientRpc, pEpSet, udf->name, udf); + + int err = uv_dlopen(udf->path, &udf->lib); + if (err != 0) { + fnError("can not load library %s. error: %s", udf->path, uv_strerror(err)); + // TODO set error + } + // TODO: find all the functions + char normalFuncName[TSDB_FUNC_NAME_LEN] = {0}; + strcpy(normalFuncName, udfName); + uv_dlsym(&udf->lib, normalFuncName, (void **)(&udf->scalarProcFunc)); + char freeFuncName[TSDB_FUNC_NAME_LEN + 6] = {0}; + char *freeSuffix = "_free"; + strncpy(freeFuncName, normalFuncName, strlen(normalFuncName)); + strncat(freeFuncName, freeSuffix, strlen(freeSuffix)); + uv_dlsym(&udf->lib, freeFuncName, (void **)(&udf->freeUdfColumn)); + return 0; } void udfdProcessRequest(uv_work_t *req) { @@ -110,13 +109,13 @@ void udfdProcessRequest(uv_work_t *req) { switch (request.type) { case UDF_TASK_SETUP: { - //TODO: tracable id from client. connect, setup, call, teardown - fnInfo("%"PRId64" setup request. udf name: %s", request.seqNum, request.setup.udfName); + // TODO: tracable id from client. connect, setup, call, teardown + fnInfo("%" PRId64 " setup request. udf name: %s", request.seqNum, request.setup.udfName); SUdfSetupRequest *setup = &request.setup; - SUdf* udf = NULL; + SUdf *udf = NULL; uv_mutex_lock(&global.udfsMutex); - SUdf** udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); + SUdf **udfInHash = taosHashGet(global.udfsHash, request.setup.udfName, TSDB_FUNC_NAME_LEN); if (*udfInHash) { ++(*udfInHash)->refCount; udf = *udfInHash; @@ -136,7 +135,7 @@ void udfdProcessRequest(uv_work_t *req) { uv_mutex_lock(&udf->lock); if (udf->state == UDF_STATE_INIT) { udf->state = UDF_STATE_LOADING; - udfdLoadUdf(setup->udfName, udf); + udfdLoadUdf(setup->udfName, &setup->epSet, udf); udf->state = UDF_STATE_READY; uv_cond_broadcast(&udf->condReady); uv_mutex_unlock(&udf->lock); @@ -168,8 +167,9 @@ void udfdProcessRequest(uv_work_t *req) { case UDF_TASK_CALL: { SUdfCallRequest *call = &request.call; - fnDebug("%"PRId64 "call request. call type %d, handle: %"PRIx64, request.seqNum, call->callType, call->udfHandle); - SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); + fnDebug("%" PRId64 "call request. call type %d, handle: %" PRIx64, request.seqNum, call->callType, + call->udfHandle); + SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(call->udfHandle); SUdf *udf = handle->udf; SUdfDataBlock input = {0}; @@ -206,10 +206,10 @@ void udfdProcessRequest(uv_work_t *req) { } case UDF_TASK_TEARDOWN: { SUdfTeardownRequest *teardown = &request.teardown; - fnInfo("teardown. %"PRId64"handle:%"PRIx64, request.seqNum, teardown->udfHandle) - SUdfcFuncHandle *handle = (SUdfcFuncHandle *)(teardown->udfHandle); - SUdf *udf = handle->udf; - bool unloadUdf = false; + fnInfo("teardown. %" PRId64 "handle:%" PRIx64, request.seqNum, teardown->udfHandle) SUdfcFuncHandle *handle = + (SUdfcFuncHandle *)(teardown->udfHandle); + SUdf *udf = handle->udf; + bool unloadUdf = false; uv_mutex_lock(&global.udfsMutex); udf->refCount--; if (udf->refCount == 0) { @@ -250,7 +250,7 @@ void udfdProcessRequest(uv_work_t *req) { void udfdOnWrite(uv_write_t *req, int status) { SUvUdfWork *work = (SUvUdfWork *)req->data; if (status < 0) { - //TODO:log error and process it. + // TODO:log error and process it. } fnDebug("send response. length:%zu, status: %s", work->output.len, uv_err_name(status)); taosMemoryFree(work->output.base); @@ -393,7 +393,7 @@ void udfdIntrSignalHandler(uv_signal_t *handle, int signum) { void udfdProcessRpcRsp(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { return; } -int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char* udfName, SUdf* udf) { +int32_t udfdFillUdfInfoFromMNode(void *clientRpc, SEpSet *pEpSet, char *udfName, SUdf *udf) { SRetrieveFuncReq retrieveReq = {0}; retrieveReq.numOfFuncs = 1; retrieveReq.pFuncNames = taosArrayInit(1, TSDB_FUNC_NAME_LEN); @@ -505,7 +505,7 @@ void udfdCtrlAllocBufCb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *bu void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { if (nread < 0) { fnError("udfd ctrl pipe read error. %s", uv_err_name(nread)); - uv_close((uv_handle_t*)q, NULL); + uv_close((uv_handle_t *)q, NULL); uv_stop(global.loop); return; } @@ -515,13 +515,13 @@ void udfdCtrlReadCb(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) { static int32_t removeListeningPipe() { uv_fs_t req; - int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); + int err = uv_fs_unlink(global.loop, &req, global.listenPipeName, NULL); uv_fs_req_cleanup(&req); return err; } static int32_t udfdUvInit() { - uv_loop_t* loop = taosMemoryMalloc(sizeof(uv_loop_t)); + uv_loop_t *loop = taosMemoryMalloc(sizeof(uv_loop_t)); if (loop) { uv_loop_init(loop); } @@ -529,10 +529,10 @@ static int32_t udfdUvInit() { uv_pipe_init(global.loop, &global.ctrlPipe, 1); uv_pipe_open(&global.ctrlPipe, 0); - uv_read_start((uv_stream_t*)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); + uv_read_start((uv_stream_t *)&global.ctrlPipe, udfdCtrlAllocBufCb, udfdCtrlReadCb); - char dnodeId[8] = {0}; - size_t dnodeIdSize; + char dnodeId[8] = {0}; + size_t dnodeIdSize; int32_t err = uv_os_getenv("DNODE_ID", dnodeId, &dnodeIdSize); if (err != 0) { dnodeId[0] = '1'; @@ -567,7 +567,7 @@ static int32_t udfdRun() { global.udfsHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); uv_mutex_init(&global.udfsMutex); - //TOOD: client rpc to fetch udf function info from mnode + // TOOD: client rpc to fetch udf function info from mnode if (udfdOpenClientRpc() != 0) { fnError("open rpc connection to mnode failure"); return -1; @@ -589,7 +589,7 @@ static int32_t udfdRun() { return code; } -int main(int argc, char* argv[]) { +int main(int argc, char *argv[]) { if (!taosCheckSystemIsSmallEnd()) { printf("failed to start since on non-small-end machines\n"); return -1; diff --git a/source/libs/function/test/runUdf.c b/source/libs/function/test/runUdf.c index fb9c3c678af8dd793aea5da5f99f70c482cadedf..0727a4a1d26db2678c1bb63a7c193e0a91fd2321 100644 --- a/source/libs/function/test/runUdf.c +++ b/source/libs/function/test/runUdf.c @@ -44,12 +44,15 @@ int main(int argc, char *argv[]) { } taosArrayPush(pBlock->pDataBlock, &colInfo); } - - SSDataBlock output = {0}; - callUdfScalaProcess(handle, pBlock, &output); - SColumnInfoData *col = taosArrayGet(output.pDataBlock, 0); - for (int32_t i = 0; i < output.info.rows; ++i) { + SScalarParam input = {0}; + input.numOfRows = pBlock->info.rows; + input.columnData = taosArrayGet(pBlock->pDataBlock, 0); + SScalarParam output = {0}; + callUdfScalarFunc(handle, &input, 1 , &output); + + SColumnInfoData *col = output.columnData; + for (int32_t i = 0; i < output.numOfRows; ++i) { fprintf(stderr, "%d\t%d\n" , i, *(int32_t*)(col->pData + i *sizeof(int32_t))); } teardownUdf(handle); diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 045f54cdb1e275632d44f736d01d4432806234e6..25f978d98819c10b983b8c069d1793b42dc4ed77 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1326,7 +1326,6 @@ int32_t qBindStmtColsValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32 STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row); - // 1. set the parsed value from sql string for (int c = 0; c < spd->numOfBound; ++c) { SSchema* pColSchema = &pSchema[spd->boundColumns[c] - 1]; @@ -1408,11 +1407,7 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, } SSchema* pColSchema = &pSchema[spd->boundColumns[colIdx] - 1]; - - if (bind->buffer_type != pColSchema->type) { - return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); - } - + if (bind->num != rowNum) { return buildInvalidOperationMsg(&pBuf, "row number in each bind param should be the same"); } @@ -1427,6 +1422,10 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, CHECK_CODE(MemRowAppend(&pBuf, NULL, 0, ¶m)); } else { + if (bind->buffer_type != pColSchema->type) { + return buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type"); + } + int32_t colLen = pColSchema->bytes; if (IS_VAR_DATA_TYPE(pColSchema->type)) { colLen = bind->length[r]; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 40860aaaf89a6b932f54d41be180e56634dd6a00..2aedc37c24cb6a266bb6388ebf03f69179f8feb5 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -566,6 +566,9 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) { TSDB_DATA_TYPE_BLOB == rdt.type) { return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, ((SExprNode*)(pOp->pRight))->aliasName); } + if (OP_TYPE_IN == pOp->opType || OP_TYPE_NOT_IN == pOp->opType) { + ((SExprNode*)pOp->pRight)->resType = ((SExprNode*)pOp->pLeft)->resType; + } pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; } else if (nodesIsJsonOp(pOp)){ diff --git a/source/libs/qworker/inc/qworkerMsg.h b/source/libs/qworker/inc/qworkerMsg.h index c0f4d6d1574c0e9b843d600ac81c84a2a6264f50..fb1950d16bb6dafcbf8c44537e94a33d51b58cf3 100644 --- a/source/libs/qworker/inc/qworkerMsg.h +++ b/source/libs/qworker/inc/qworkerMsg.h @@ -43,7 +43,8 @@ void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwGetSchTasksStatus(SQWorkerMgmt *mgmt, uint64_t sId, SSchedulerStatusRsp **rsp); int32_t qwBuildAndSendHbRsp(SQWConnInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); -int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); +int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn); +int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index f97d7ffae0b344716f4947f436bb89e0ad665fd0..a74c28cf5395c7e37b382654e89234d5e8f93ba1 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -948,7 +948,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex DataSinkHandle sinkHandle = NULL; SQWTaskCtx *ctx = NULL; - QW_ERR_JRET(qwRegisterBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); + QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_QUERY, &input, NULL)); @@ -1285,23 +1285,51 @@ _return: QW_RET(TSDB_CODE_SUCCESS); } +int32_t qwProcessHbLinkBroken(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { + int32_t code = 0; + SSchedulerHbRsp rsp = {0}; + SQWSchStatus *sch = NULL; + + QW_ERR_RET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); + + QW_LOCK(QW_WRITE, &sch->hbConnLock); + + if (qwMsg->connInfo.handle == sch->hbConnInfo.handle) { + tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); + sch->hbConnInfo.handle = NULL; + sch->hbConnInfo.ahandle = NULL; + + QW_DLOG("release hb handle due to connection broken, handle:%p", qwMsg->connInfo.handle); + } else { + QW_DLOG("ignore hb connection broken, handle:%p, currentHandle:%p", qwMsg->connInfo.handle, sch->hbConnInfo.handle); + } + + QW_UNLOCK(QW_WRITE, &sch->hbConnLock); + + qwReleaseScheduler(QW_READ, mgmt); + + QW_RET(TSDB_CODE_SUCCESS); +} + int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { int32_t code = 0; SSchedulerHbRsp rsp = {0}; SQWSchStatus *sch = NULL; - uint64_t seqId = 0; - void *origHandle = NULL; - memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); + if (qwMsg->code) { + QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req)); + } QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); + QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo)); + QW_LOCK(QW_WRITE, &sch->hbConnLock); if (sch->hbConnInfo.handle) { tmsgReleaseHandle(sch->hbConnInfo.handle, TAOS_CONN_SERVER); } - + memcpy(&sch->hbConnInfo, &qwMsg->connInfo, sizeof(qwMsg->connInfo)); memcpy(&sch->hbEpId, &req->epId, sizeof(req->epId)); @@ -1314,7 +1342,14 @@ int32_t qwProcessHb(SQWorkerMgmt *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { _return: + memcpy(&rsp.epId, &req->epId, sizeof(req->epId)); + qwBuildAndSendHbRsp(&qwMsg->connInfo, &rsp, code); + + if (code) { + tmsgReleaseHandle(qwMsg->connInfo.handle, TAOS_CONN_SERVER); + } + QW_DLOG("hb rsp send, handle:%p, code:%x - %s", qwMsg->connInfo.handle, code, tstrerror(code)); QW_RET(TSDB_CODE_SUCCESS); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index 15a42d3a3157411ba3a9bbbdaa6d0576d41ce7d4..6723eb21c1c6ad96db086fc7e08c9826249fc5c7 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -286,7 +286,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { } -int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { +int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { STaskDropReq * req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); if (NULL == req) { QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); @@ -313,6 +313,42 @@ int32_t qwRegisterBrokenLinkArg(QW_FPARAMS_DEF, SQWConnInfo *pConn) { return TSDB_CODE_SUCCESS; } +int32_t qwRegisterHbBrokenLinkArg(SQWorkerMgmt *mgmt, uint64_t sId, SQWConnInfo *pConn) { + SSchedulerHbReq req = {0}; + req.header.vgId = mgmt->nodeId; + req.sId = sId; + + int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req); + if (msgSize < 0) { + QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + void *msg = rpcMallocCont(msgSize); + if (NULL == msg) { + QW_SCH_ELOG("calloc %d failed", msgSize); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + if (tSerializeSSchedulerHbReq(msg, msgSize, &req) < 0) { + QW_SCH_ELOG("tSerializeSSchedulerHbReq hbReq failed, size:%d", msgSize); + taosMemoryFree(msg); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + SRpcMsg pMsg = { + .handle = pConn->handle, + .ahandle = pConn->ahandle, + .msgType = TDMT_VND_QUERY_HEARTBEAT, + .pCont = msg, + .contLen = sizeof(SSchedulerHbReq), + .code = TSDB_CODE_RPC_NETWORK_UNAVAIL, + }; + + tmsgRegisterBrokenLinkArg(&mgmt->msgCb, &pMsg); + + return TSDB_CODE_SUCCESS; +} + + int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { @@ -587,10 +623,14 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { } uint64_t sId = req.sId; - SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0}; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code}; qwMsg.connInfo.handle = pMsg->handle; qwMsg.connInfo.ahandle = pMsg->ahandle; + if (TSDB_CODE_RPC_NETWORK_UNAVAIL == pMsg->code) { + QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); + } + QW_SCH_DLOG("processHb start, node:%p, handle:%p", node, pMsg->handle); QW_ERR_RET(qwProcessHb(mgmt, &qwMsg, &req)); diff --git a/source/libs/scalar/inc/filterInt.h b/source/libs/scalar/inc/filterInt.h index 2b9e3c7f39f848cdfa3bb1793962b6769e2d0b68..99c33d3165d306bd89cb1411b6de417065b182ea 100644 --- a/source/libs/scalar/inc/filterInt.h +++ b/source/libs/scalar/inc/filterInt.h @@ -215,8 +215,10 @@ typedef struct SFilterPCtx { } SFilterPCtx; typedef struct SFltTreeStat { - int32_t code; - bool scalarMode; + int32_t code; + int8_t precision; + bool scalarMode; + SFilterInfo* info; } SFltTreeStat; typedef struct SFltScalarCtx { diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index 23047c293073299096afc3a0a596d0bfeb73f7aa..e24162bccbd97657d8097309dd38076c9f71f35e 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -3364,6 +3364,12 @@ int32_t filterGetTimeRangeImpl(SFilterInfo *info, STimeWindow *win, bool * filterGetRangeRes(prev, &tra); win->skey = tra.s; win->ekey = tra.e; + if (FILTER_GET_FLAG(tra.sflag, RANGE_FLG_EXCLUDE)) { + win->skey++; + } + if (FILTER_GET_FLAG(tra.eflag, RANGE_FLG_EXCLUDE)) { + win->ekey--; + } } filterFreeRangeCtx(prev); @@ -3492,7 +3498,40 @@ EDealRes fltReviseRewriter(SNode** pNode, void* pContext) { return DEAL_RES_CONTINUE; } - if (QUERY_NODE_VALUE == nodeType(*pNode) || QUERY_NODE_NODE_LIST == nodeType(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { + if (QUERY_NODE_VALUE == nodeType(*pNode)) { + if (!FILTER_GET_FLAG(stat->info->options, FLT_OPTION_TIMESTAMP)) { + return DEAL_RES_CONTINUE; + } + + SValueNode *valueNode = (SValueNode *)*pNode; + if (TSDB_DATA_TYPE_BINARY != valueNode->node.resType.type) { + return DEAL_RES_CONTINUE; + } + +#if 0 + if (stat->precision < 0) { + //TODO + return DEAL_RES_CONTINUE; + } + + char *timeStr = valueNode->datum.p; + if (taosParseTime(valueNode->datum.p, &valueNode->datum.i, valueNode->node.resType.bytes, stat->precision, tsDaylight) != + TSDB_CODE_SUCCESS) { + return generateDealNodeErrMsg(pCxt, TSDB_CODE_PAR_WRONG_VALUE_TYPE, pVal->literal); + } + TODO +#else + return DEAL_RES_CONTINUE; +#endif + } + + if (QUERY_NODE_COLUMN == nodeType(*pNode)) { + SColumnNode *colNode = (SColumnNode *)*pNode; + stat->precision = colNode->node.resType.precision; + return DEAL_RES_CONTINUE; + } + + if (QUERY_NODE_NODE_LIST == nodeType(*pNode)) { return DEAL_RES_CONTINUE; } @@ -3656,16 +3695,19 @@ int32_t filterInitFromNode(SNode* pNode, SFilterInfo **pInfo, uint32_t options) info = *pInfo; info->options = options; - SFltTreeStat stat1 = {0}; - FLT_ERR_JRET(fltReviseNodes(info, &pNode, &stat1)); + SFltTreeStat stat = {0}; + stat.precision = -1; + stat.info = info; + + FLT_ERR_JRET(fltReviseNodes(info, &pNode, &stat)); - info->scalarMode = stat1.scalarMode; + info->scalarMode = stat.scalarMode; if (!info->scalarMode) { FLT_ERR_JRET(fltInitFromNode(pNode, info, options)); } else { info->sclCtx.node = pNode; - FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat1)); + FLT_ERR_JRET(fltOptimizeNodes(info, &info->sclCtx.node, &stat)); } return code; diff --git a/source/libs/scalar/src/sclfunc.c b/source/libs/scalar/src/sclfunc.c index 53e108bb17a488a840cbeafe52061409967373a6..3fa4797273dafd1223cc5973af2a250fe549d0f4 100644 --- a/source/libs/scalar/src/sclfunc.c +++ b/source/libs/scalar/src/sclfunc.c @@ -1123,20 +1123,20 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p GET_TYPED_DATA(timeUnit, int64_t, GET_PARAM_TYPE(&pInput[2]), pInput[2].columnData->pData); } - char *input[2]; - for (int32_t k = 0; k < 2; ++k) { - int32_t type = GET_PARAM_TYPE(&pInput[k]); - if (type != TSDB_DATA_TYPE_BIGINT && type != TSDB_DATA_TYPE_TIMESTAMP && - type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR) { - return TSDB_CODE_FAILED; + int32_t numOfRows = 0; + for (int32_t i = 0; i < inputNum; ++i) { + if (pInput[i].numOfRows > numOfRows) { + numOfRows = pInput[i].numOfRows; } } - for (int32_t i = 0; i < pInput[0].numOfRows; ++i) { + char *input[2]; + for (int32_t i = 0; i < numOfRows; ++i) { + bool hasNull = false; for (int32_t k = 0; k < 2; ++k) { - if (colDataIsNull_s(pInput[0].columnData, i)) { - colDataAppendNULL(pOutput->columnData, i); - continue; + if (colDataIsNull_s(pInput[k].columnData, i)) { + hasNull = true; + break; } int32_t rowIdx = (pInput[k].numOfRows == 1) ? 0 : i; @@ -1178,6 +1178,11 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p } } + if (hasNull) { + colDataAppendNULL(pOutput->columnData, i); + continue; + } + int64_t result = (timeVal[0] >= timeVal[1]) ? (timeVal[0] - timeVal[1]) : (timeVal[1] - timeVal[0]); @@ -1238,7 +1243,7 @@ int32_t timeDiffFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *p colDataAppend(pOutput->columnData, i, (char *)&result, false); } - pOutput->numOfRows = pInput->numOfRows; + pOutput->numOfRows = numOfRows; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index c5191947e0c07f960825836596c3e1e6ca9813d3..59c3104e96c0320804ba4f17dd0a013146b27a2d 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -245,7 +245,7 @@ TEST(timerangeTest, greater) { int32_t code = filterGetTimeRange(opNode1, &win, &isStrict); ASSERT_EQ(code, 0); ASSERT_EQ(isStrict, true); - ASSERT_EQ(win.skey, tsmall); + ASSERT_EQ(win.skey, tsmall+1); ASSERT_EQ(win.ekey, INT64_MAX); //filterFreeInfo(filter); nodesDestroyNode(opNode1); @@ -268,6 +268,37 @@ TEST(timerangeTest, greater_and_lower) { flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); + //SFilterInfo *filter = NULL; + //int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP); + //ASSERT_EQ(code, 0); + STimeWindow win = {0}; + bool isStrict = false; + int32_t code = filterGetTimeRange(logicNode, &win, &isStrict); + ASSERT_EQ(isStrict, true); + ASSERT_EQ(code, 0); + ASSERT_EQ(win.skey, tsmall+1); + ASSERT_EQ(win.ekey, tbig-1); + //filterFreeInfo(filter); + nodesDestroyNode(logicNode); +} + +TEST(timerangeTest, greater_equal_and_lower_equal) { + SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode = NULL; + bool eRes[5] = {false, false, true, true, true}; + SScalarParam res = {0}; + int64_t tsmall = 222, tbig = 333; + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tsmall); + flttMakeOpNode(&opNode1, OP_TYPE_GREATER_EQUAL, TSDB_DATA_TYPE_BOOL, pcol, pval); + flttMakeColumnNode(&pcol, NULL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 0, NULL); + flttMakeValueNode(&pval, TSDB_DATA_TYPE_TIMESTAMP, &tbig); + flttMakeOpNode(&opNode2, OP_TYPE_LOWER_EQUAL, TSDB_DATA_TYPE_BOOL, pcol, pval); + SNode *list[2] = {0}; + list[0] = opNode1; + list[1] = opNode2; + + flttMakeLogicNode(&logicNode, LOGIC_COND_TYPE_AND, list, 2); + //SFilterInfo *filter = NULL; //int32_t code = filterInitFromNode(logicNode, &filter, FLT_OPTION_NO_REWRITE|FLT_OPTION_TIMESTAMP); //ASSERT_EQ(code, 0); @@ -282,6 +313,7 @@ TEST(timerangeTest, greater_and_lower) { nodesDestroyNode(logicNode); } + TEST(timerangeTest, greater_and_lower_not_strict) { SNode *pcol = NULL, *pval = NULL, *opNode1 = NULL, *opNode2 = NULL, *logicNode1 = NULL, *logicNode2 = NULL; bool eRes[5] = {false, false, true, true, true}; @@ -324,8 +356,8 @@ TEST(timerangeTest, greater_and_lower_not_strict) { int32_t code = filterGetTimeRange(logicNode1, &win, &isStrict); ASSERT_EQ(isStrict, false); ASSERT_EQ(code, 0); - ASSERT_EQ(win.skey, tsmall1); - ASSERT_EQ(win.ekey, tbig2); + ASSERT_EQ(win.skey, tsmall1+1); + ASSERT_EQ(win.ekey, tbig2-1); //filterFreeInfo(filter); nodesDestroyNode(logicNode1); }