diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 4e5e9e45d235293ff8ee0ca226a7e5289eff65f1..a1c8690dfc14bb7d06c26cf600e6efb7df54f59e 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -826,6 +826,25 @@ TEST(clientCase, projection_query_tables) { } taos_free_result(pRes); + int64_t start = 1685959190000; + + int32_t code = -1; + for(int32_t i = 0; i < 1000000; ++i) { + char t[512] = {0}; + + sprintf(t, "insert into t1 values(%ld, %ld)", start + i, i); + while(1) { + void* p = taos_query(pConn, t); + code = taos_errno(p); + taos_free_result(p); + if (code != 0) { + printf("insert data error, retry\n"); + } else { + break; + } + } + } + for (int32_t i = 0; i < 1; ++i) { printf("create table :%d\n", i); createNewTable(pConn, i); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bc66f5a3aefa40f36f31e2dca7c252725ad0ccd6..38453ee81a0190e0712d9fb5f1f7e15432e2bc6a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -918,6 +918,7 @@ void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); + tqDebug("s-task:0x%x start to expand task", pTask->id.taskId); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, ver); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index a9e6c2e625aa93167ef8dbd3dd2a6050c36f52f9..837afad072d820f59ad1d684e4acd7692f44df10 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -41,8 +41,9 @@ extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; -void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); -int32_t streamDispatchStreamBlock(SStreamTask* pTask); +const char* streamGetBlockTypeStr(int32_t type); +void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); +int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index df9813ad9b49b0a2fefa1f9caeeba110df585810..1e6a389486107d29655b569ae063f338cb2cd1b3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -397,8 +397,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { } } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s level:%d checkpoint(trigger)/trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", - pTask->id.idStr, pTask->info.taskLevel, total, size); + qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", + pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 041cf1f0cdd0a426785d7b4c794af91386846a92..f2e3b8529a0752711aecb3f45efabb1b47de29dd 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -244,3 +244,16 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pBlock); } } + +const char* streamGetBlockTypeStr(int32_t type) { + switch (type) { + case STREAM_INPUT__CHECKPOINT: + return "checkpoint"; + case STREAM_INPUT__CHECKPOINT_TRIGGER: + return "checkpoint-triggre"; + case STREAM_INPUT__TRANS_STATE: + return "trans-state"; + default: + return ""; + } +} \ No newline at end of file diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 9a33e7a1e1a9b0a5b141678bcdbb2b886f524b0e..194be382ff48ec477875365af014643bb30ac977 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -221,7 +221,17 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || qItem->type == STREAM_INPUT__TRANS_STATE) { if (*pInput == NULL) { - qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id); + + char* p = NULL; + if (qItem->type == STREAM_INPUT__CHECKPOINT) { + p = "checkpoint"; + } else if (qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + p = "checkpoint-trigger"; + } else { + p = "transtate"; + } + + qDebug("s-task:%s %s msg extracted, start to process immediately", id, p); *numOfBlocks = 1; *pInput = qItem; return TSDB_CODE_SUCCESS;