diff --git a/example/src/tstream.c b/example/src/tstream.c index 51578bd27b54eba0a50c3bf5bc18066138ad5b48..8ffa932bd29d6f89441c7f045611f42e058dd872 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -25,7 +25,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/client/taos.h b/include/client/taos.h index 55e1d1c4225f1464257214f7a8bda91e00aec455..dc54b89d04c7d4571eae9cfa17d633b950c70929 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -217,7 +217,6 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen); -DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ @@ -258,7 +257,8 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); -DLL_EXPORT char *tmq_get_topic_schema(tmq_t *tmq, const char *topic); +DLL_EXPORT void *tmq_get_topic_schema(tmq_t *tmq, const char *topic); +DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 177fe39397b12efdf3ffd221bf69932ffb0d15ac..4efddde9351729ec5a0ba8b9ffed3cb78f2c12fe 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -62,10 +62,11 @@ typedef struct { } STaskExec; typedef struct { - int8_t reserved; + int32_t taskId; } STaskDispatcherInplace; typedef struct { + int32_t taskId; int32_t nodeId; SEpSet epSet; } STaskDispatcherFixedEp; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 2a9b3cdf6426a9a1f0fc4b8b7005e5d05fc3c60b..8eaca6853d6bfad12136d9fd033f084766308f22 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -186,23 +186,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value } } - if (strcmp(key, "connection.ip") == 0) { + if (strcmp(key, "td.connect.ip") == 0) { conf->ip = strdup(value); return TMQ_CONF_OK; } - if (strcmp(key, "connection.user") == 0) { + if (strcmp(key, "td.connect.user") == 0) { conf->user = strdup(value); return TMQ_CONF_OK; } - if (strcmp(key, "connection.pass") == 0) { + if (strcmp(key, "td.connect.pass") == 0) { conf->pass = strdup(value); return TMQ_CONF_OK; } - if (strcmp(key, "connection.port") == 0) { + if (strcmp(key, "td.connect.port") == 0) { conf->port = atoi(value); return TMQ_CONF_OK; } - if (strcmp(key, "connection.db") == 0) { + if (strcmp(key, "td.connect.db") == 0) { conf->db = strdup(value); return TMQ_CONF_OK; } @@ -223,13 +223,13 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { } void tmq_list_destroy(tmq_list_t* list) { - SArray* container = (SArray*)list; + SArray* container = &list->container; /*taosArrayDestroy(container);*/ taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree); } void tmqClearUnhandleMsg(tmq_t* tmq) { - tmq_message_t* msg; + tmq_message_t* msg = NULL; while (1) { taosGetQitem(tmq->qall, (void**)&msg); if (msg) @@ -807,7 +807,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqClientVg* pVg = pParam->pVg; tmq_t* tmq = pParam->tmq; if (code != 0) { - printf("msg discard %x\n", code); + printf("msg discard, code:%x\n", code); goto WRITE_QUEUE_FAIL; } @@ -877,10 +877,10 @@ WRITE_QUEUE_FAIL: } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { + printf("call update ep %d\n", epoch); bool set = false; int32_t sz = taosArrayGetSize(pRsp->topics); - if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); - tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); + SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); for (int32_t i = 0; i < sz; i++) { SMqClientTopic topic = {0}; SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); @@ -899,8 +899,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { taosArrayPush(topic.vgs, &clientVg); set = true; } - taosArrayPush(tmq->clientTopics, &topic); + taosArrayPush(newTopics, &topic); } + if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); + tmq->clientTopics = newTopics; atomic_store_32(&tmq->epoch, epoch); return set; } @@ -1219,6 +1221,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) { /*printf("epoch match\n");*/ SMqClientVg* pVg = rspMsg->vg; + /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ pVg->currentOffset = rspMsg->msg.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); return rspMsg; diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index e97d6e7f116c1c0f084cfa772a56367d2704bee5..7b6d78a60c9762d672b12abd858901945f05d281 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -160,6 +160,24 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf } } +static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { + SVnodeObj *pVnode = pInfo->ahandle; + SNodeMsg *pMsg = NULL; + + for (int32_t i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pMsg); + + dTrace("msg:%p, will be processed in vnode-merge queue", pMsg); + int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg); + if (code != 0) { + vmSendRsp(pVnode->pWrapper, pMsg, code); + dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + rpcFreeCont(pMsg->rpcMsg.pCont); + taosFreeQitem(pMsg); + } + } +} + static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { SRpcMsg *pRpc = &pMsg->rpcMsg; int32_t code = -1; @@ -308,7 +326,7 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); - pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeMsg); + pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a4dfd293debf69171b65195062f69b591433f87c..69ee1a569693329765d8bcf4892555fc1d3acf44 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -185,6 +185,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId; pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 91cbc2cff8018b20f7b33c9f21179a61bea2c1a3..4661668cbed50cd6914fa8c22846b290b8cbf803 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -42,8 +42,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq // TODO: error code of buffer pool } #endif - pTq->tqMeta = - tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); + pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, + (FTqDelete)taosMemoryFree, 0); if (pTq->tqMeta == NULL) { taosMemoryFree(pTq); #if 0 @@ -498,12 +498,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { } int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { - SStreamTaskExecReq* pReq = msg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead)); + + SStreamTaskExecReq req; + tDecodeSStreamTaskExecReq(msgstr, &req); + + int32_t taskId = req.taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); ASSERT(pTask); - if (streamExecTask(pTask, &pTq->pVnode->msgCb, pReq->data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) { + if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) { // TODO } return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 1db17f37cbea660f83f0c2a549e5d9dbfe80b1ea..94e183f5256a92f926870eba892a71e85b91872d 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -66,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { case TDMT_VND_CONSUME: return tqProcessPollReq(pVnode->pTq, pMsg); case TDMT_VND_TASK_EXEC: + case TDMT_VND_TASK_PIPE_EXEC: + case TDMT_VND_TASK_MERGE_EXEC: return tqProcessTaskExec(pVnode->pTq, pMsg); case TDMT_VND_STREAM_TRIGGER: return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 3ec660367363959b72ebd3c3332e30c0899c0fb8..31a06a9d9a6e464a9b6f223712b00c7659133b0c 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -121,7 +121,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { SStreamTaskExecReq req = { .streamId = pTask->streamId, - .taskId = pTask->taskId, + .taskId = pTask->fixedEpDispatcher.taskId, .data = pRes, }; @@ -211,8 +211,9 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { } if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { @@ -248,8 +249,9 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { } if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 83c48628a3c85669b807560deb03daa0566927a5..36323cdffa68412ec144f29e543337d2e770077b 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -19,13 +19,13 @@ #include "tref.h" #include "walInt.h" -int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } +int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } -int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; } +int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; } -int64_t inline walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; } +int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; } -static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { +static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); } @@ -46,7 +46,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { return NULL; } -static inline int64_t walScanLogGetLastVer(SWal* pWal) { +static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { ASSERT(pWal->fileInfoSet != NULL); int sz = taosArrayGetSize(pWal->fileInfoSet); ASSERT(sz > 0); diff --git a/source/libs/wal/src/walSeek.c b/source/libs/wal/src/walSeek.c index 140b7ddc32614662ceb86daae8f3beafb1ff8926..413dcb47f0663b21a38e5642b8d92a18fb88edde 100644 --- a/source/libs/wal/src/walSeek.c +++ b/source/libs/wal/src/walSeek.c @@ -74,9 +74,9 @@ int walSetWrite(SWal* pWal) { } int walChangeWrite(SWal* pWal, int64_t ver) { - int code = 0; + int code; TdFilePtr pIdxTFile, pLogTFile; - char fnameStr[WAL_FILE_LEN]; + char fnameStr[WAL_FILE_LEN]; if (pWal->pWriteLogTFile != NULL) { code = taosCloseFile(&pWal->pWriteLogTFile); if (code != 0) { @@ -133,7 +133,6 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) { return -1; } if (ver < pWal->vers.snapshotVer) { - } if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { code = walChangeWrite(pWal, ver); diff --git a/tests/script/tsim/tmq/basic.sim b/tests/script/tsim/tmq/basic.sim index 876cf7e266d29cb06f4914cafde6c1c38bd5d8e1..1eeec46d5386b6ae89521c59260f8ab84f0748eb 100644 --- a/tests/script/tsim/tmq/basic.sim +++ b/tests/script/tsim/tmq/basic.sim @@ -45,7 +45,7 @@ print cmd===> system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal print cmd result----> $system_content if $system_content != @{consume success: 100}@ then - print not match in pos000 + return -1 endi sql show databases diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 1690a5fb3ee332c78e4647b7ee430623d497fb00..2618984f51d33789e4c4afbb5a1d01fcfabd3001 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -314,7 +314,7 @@ int32_t init_env() { } //const char* sql = "select * from tu1"; - sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s0", g_stConfInfo.stbName); + sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName); /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/ pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { @@ -351,36 +351,6 @@ tmq_list_t* build_topic_list() { return topic_list; } -void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - tmq_resp_err_t err; - - if ((err = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); - printf("subscribe err\n"); - return; - } - int32_t cnt = 0; - /*clock_t startTime = clock();*/ - while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); - if (tmqmessage) { - cnt++; - msg_process(tmqmessage); - tmq_message_destroy(tmqmessage); - /*} else {*/ - /*break;*/ - } - } - /*clock_t endTime = clock();*/ - /*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/ - - err = tmq_consumer_close(tmq); - if (err) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); - else - fprintf(stderr, "%% Consumer closed\n"); -} - void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { static const int MIN_COMMIT_COUNT = 1000; @@ -438,7 +408,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog if (batchCnt != totalMsgs) { printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC); - exit(-1); + /*exit(-1);*/ } if (0 == g_stConfInfo.simCase) { @@ -691,12 +661,13 @@ int main(int32_t argc, char *argv[]) { float rowsSpeed = totalRows / seconds; float msgsSpeed = totalMsgs / seconds; - walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); - if (walLogSize <= 0) { - printf("vnode2/wal size incorrect!"); - /*exit(-1);*/ - } else { - if (0 == g_stConfInfo.simCase) { + + if (0 == g_stConfInfo.simCase) { + walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); + if (walLogSize <= 0) { + printf("%s size incorrect!", g_stConfInfo.vnodeWalPath); + exit(-1); + } else { pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0)); } }