未验证 提交 c10f9a18 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #11045 from taosdata/feature/tq

multiple level stream schedule
...@@ -25,7 +25,7 @@ int32_t init_env() { ...@@ -25,7 +25,7 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
......
...@@ -217,7 +217,6 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *); ...@@ -217,7 +217,6 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen); DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */ /* ------------------------TMQ CONSUMER INTERFACE------------------------ */
...@@ -258,7 +257,8 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); ...@@ -258,7 +257,8 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_schema(tmq_t *tmq, const char *topic); DLL_EXPORT void *tmq_get_topic_schema(tmq_t *tmq, const char *topic);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
......
...@@ -62,10 +62,11 @@ typedef struct { ...@@ -62,10 +62,11 @@ typedef struct {
} STaskExec; } STaskExec;
typedef struct { typedef struct {
int8_t reserved; int32_t taskId;
} STaskDispatcherInplace; } STaskDispatcherInplace;
typedef struct { typedef struct {
int32_t taskId;
int32_t nodeId; int32_t nodeId;
SEpSet epSet; SEpSet epSet;
} STaskDispatcherFixedEp; } STaskDispatcherFixedEp;
......
...@@ -186,23 +186,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value ...@@ -186,23 +186,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
} }
} }
if (strcmp(key, "connection.ip") == 0) { if (strcmp(key, "td.connect.ip") == 0) {
conf->ip = strdup(value); conf->ip = strdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "connection.user") == 0) { if (strcmp(key, "td.connect.user") == 0) {
conf->user = strdup(value); conf->user = strdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "connection.pass") == 0) { if (strcmp(key, "td.connect.pass") == 0) {
conf->pass = strdup(value); conf->pass = strdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "connection.port") == 0) { if (strcmp(key, "td.connect.port") == 0) {
conf->port = atoi(value); conf->port = atoi(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
if (strcmp(key, "connection.db") == 0) { if (strcmp(key, "td.connect.db") == 0) {
conf->db = strdup(value); conf->db = strdup(value);
return TMQ_CONF_OK; return TMQ_CONF_OK;
} }
...@@ -223,13 +223,13 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { ...@@ -223,13 +223,13 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
} }
void tmq_list_destroy(tmq_list_t* list) { void tmq_list_destroy(tmq_list_t* list) {
SArray* container = (SArray*)list; SArray* container = &list->container;
/*taosArrayDestroy(container);*/ /*taosArrayDestroy(container);*/
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree); taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
} }
void tmqClearUnhandleMsg(tmq_t* tmq) { void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg; tmq_message_t* msg = NULL;
while (1) { while (1) {
taosGetQitem(tmq->qall, (void**)&msg); taosGetQitem(tmq->qall, (void**)&msg);
if (msg) if (msg)
...@@ -807,7 +807,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { ...@@ -807,7 +807,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg; SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq; tmq_t* tmq = pParam->tmq;
if (code != 0) { if (code != 0) {
printf("msg discard %x\n", code); printf("msg discard, code:%x\n", code);
goto WRITE_QUEUE_FAIL; goto WRITE_QUEUE_FAIL;
} }
...@@ -877,10 +877,10 @@ WRITE_QUEUE_FAIL: ...@@ -877,10 +877,10 @@ WRITE_QUEUE_FAIL:
} }
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
printf("call update ep %d\n", epoch);
bool set = false; bool set = false;
int32_t sz = taosArrayGetSize(pRsp->topics); int32_t sz = taosArrayGetSize(pRsp->topics);
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0}; SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
...@@ -899,8 +899,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { ...@@ -899,8 +899,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
taosArrayPush(topic.vgs, &clientVg); taosArrayPush(topic.vgs, &clientVg);
set = true; set = true;
} }
taosArrayPush(tmq->clientTopics, &topic); taosArrayPush(newTopics, &topic);
} }
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = newTopics;
atomic_store_32(&tmq->epoch, epoch); atomic_store_32(&tmq->epoch, epoch);
return set; return set;
} }
...@@ -1219,6 +1221,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese ...@@ -1219,6 +1221,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) { if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/ /*printf("epoch match\n");*/
SMqClientVg* pVg = rspMsg->vg; SMqClientVg* pVg = rspMsg->vg;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = rspMsg->msg.rspOffset; pVg->currentOffset = rspMsg->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
return rspMsg; return rspMsg;
......
...@@ -160,6 +160,24 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf ...@@ -160,6 +160,24 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
} }
} }
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
}
}
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) { static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
...@@ -308,7 +326,7 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { ...@@ -308,7 +326,7 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeMsg); pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
......
...@@ -185,6 +185,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { ...@@ -185,6 +185,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC; pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
pTask->dispatchType = TASK_DISPATCH__FIXED; pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId; pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet; pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
} }
......
...@@ -42,8 +42,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq ...@@ -42,8 +42,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq
// TODO: error code of buffer pool // TODO: error code of buffer pool
} }
#endif #endif
pTq->tqMeta = pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0); (FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) { if (pTq->tqMeta == NULL) {
taosMemoryFree(pTq); taosMemoryFree(pTq);
#if 0 #if 0
...@@ -498,12 +498,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) { ...@@ -498,12 +498,16 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
} }
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) { int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
SStreamTaskExecReq* pReq = msg->pCont; char* msgstr = POINTER_SHIFT(msg->pCont, sizeof(SMsgHead));
int32_t taskId = pReq->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msgstr, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask); ASSERT(pTask);
if (streamExecTask(pTask, &pTq->pVnode->msgCb, pReq->data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) { if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) {
// TODO // TODO
} }
return 0; return 0;
......
...@@ -66,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { ...@@ -66,6 +66,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg); return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC: case TDMT_VND_TASK_EXEC:
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg); return tqProcessTaskExec(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_TRIGGER: case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen); return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
......
...@@ -121,7 +121,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in ...@@ -121,7 +121,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SStreamTaskExecReq req = { SStreamTaskExecReq req = {
.streamId = pTask->streamId, .streamId = pTask->streamId,
.taskId = pTask->taskId, .taskId = pTask->fixedEpDispatcher.taskId,
.data = pRes, .data = pRes,
}; };
...@@ -211,8 +211,9 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { ...@@ -211,8 +211,9 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1; if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
...@@ -248,8 +249,9 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { ...@@ -248,8 +249,9 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
} }
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
......
...@@ -19,13 +19,13 @@ ...@@ -19,13 +19,13 @@
#include "tref.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; } int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; } int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
int64_t inline walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; } int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
} }
...@@ -46,7 +46,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) { ...@@ -46,7 +46,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
return NULL; return NULL;
} }
static inline int64_t walScanLogGetLastVer(SWal* pWal) { static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL); ASSERT(pWal->fileInfoSet != NULL);
int sz = taosArrayGetSize(pWal->fileInfoSet); int sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0); ASSERT(sz > 0);
......
...@@ -74,9 +74,9 @@ int walSetWrite(SWal* pWal) { ...@@ -74,9 +74,9 @@ int walSetWrite(SWal* pWal) {
} }
int walChangeWrite(SWal* pWal, int64_t ver) { int walChangeWrite(SWal* pWal, int64_t ver) {
int code = 0; int code;
TdFilePtr pIdxTFile, pLogTFile; TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN]; char fnameStr[WAL_FILE_LEN];
if (pWal->pWriteLogTFile != NULL) { if (pWal->pWriteLogTFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile); code = taosCloseFile(&pWal->pWriteLogTFile);
if (code != 0) { if (code != 0) {
...@@ -133,7 +133,6 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) { ...@@ -133,7 +133,6 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) {
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
} }
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeWrite(pWal, ver); code = walChangeWrite(pWal, ver);
......
...@@ -314,7 +314,7 @@ int32_t init_env() { ...@@ -314,7 +314,7 @@ int32_t init_env() {
} }
//const char* sql = "select * from tu1"; //const char* sql = "select * from tu1";
sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s0", g_stConfInfo.stbName); sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/ /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes = taos_query(pConn, sqlStr); pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
...@@ -351,36 +351,6 @@ tmq_list_t* build_topic_list() { ...@@ -351,36 +351,6 @@ tmq_list_t* build_topic_list() {
return topic_list; return topic_list;
} }
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000; static const int MIN_COMMIT_COUNT = 1000;
...@@ -438,7 +408,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog ...@@ -438,7 +408,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if (batchCnt != totalMsgs) { if (batchCnt != totalMsgs) {
printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC); printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
exit(-1); /*exit(-1);*/
} }
if (0 == g_stConfInfo.simCase) { if (0 == g_stConfInfo.simCase) {
...@@ -691,6 +661,7 @@ int main(int32_t argc, char *argv[]) { ...@@ -691,6 +661,7 @@ int main(int32_t argc, char *argv[]) {
float rowsSpeed = totalRows / seconds; float rowsSpeed = totalRows / seconds;
float msgsSpeed = totalMsgs / seconds; float msgsSpeed = totalMsgs / seconds;
if (0 == g_stConfInfo.simCase) { if (0 == g_stConfInfo.simCase) {
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
if (walLogSize <= 0) { if (walLogSize <= 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册