提交 e4a04892 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11463-3.0

......@@ -25,7 +25,7 @@ int32_t init_env() {
return -1;
}
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1;
......
......@@ -217,7 +217,6 @@ DLL_EXPORT void tmq_list_destroy(tmq_list_t *);
DLL_EXPORT tmq_t *tmq_consumer_new(void *conn, tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT tmq_t *tmq_consumer_new1(tmq_conf_t *conf, char *errstr, int32_t errstrLen);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
......@@ -258,7 +257,8 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message);
DLL_EXPORT char *tmq_get_topic_schema(tmq_t *tmq, const char *topic);
DLL_EXPORT void *tmq_get_topic_schema(tmq_t *tmq, const char *topic);
DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen);
......
......@@ -192,7 +192,6 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DEPLOY, "vnode-task-deploy", SStreamTaskDeployReq, SStreamTaskDeployRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_EXEC, "vnode-task-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_PIPE_EXEC, "vnode-task-pipe-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_MERGE_EXEC, "vnode-task-merge-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
TD_DEF_MSG_TYPE(TDMT_VND_TASK_WRITE_EXEC, "vnode-task-write-exec", SStreamTaskExecReq, SStreamTaskExecRsp)
......
......@@ -62,10 +62,11 @@ typedef struct {
} STaskExec;
typedef struct {
int8_t reserved;
int32_t taskId;
} STaskDispatcherInplace;
typedef struct {
int32_t taskId;
int32_t nodeId;
SEpSet epSet;
} STaskDispatcherFixedEp;
......@@ -81,8 +82,12 @@ typedef struct {
SHashObj* pHash; // groupId to tbuid
} STaskSinkTb;
typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data);
typedef struct {
int8_t reserved;
int64_t smaId;
// following are not applicable to encoder and decoder
FSmaHandle* smaHandle;
} STaskSinkSma;
typedef struct {
......@@ -155,7 +160,8 @@ typedef struct {
STaskDispatcherShuffle shuffleDispatcher;
};
// state storage
// application storage
void* ahandle;
} SStreamTask;
......
......@@ -51,6 +51,7 @@ void taosProcCleanup(SProcObj *pProc);
int32_t taosProcRun(SProcObj *pProc);
void taosProcStop(SProcObj *pProc);
bool taosProcIsChild(SProcObj *pProc);
int32_t taosProcChildId(SProcObj *pProc);
int32_t taosProcPutToChildQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
int32_t taosProcPutToParentQueue(SProcObj *pProc, void *pHead, int32_t headLen, void *pBody, int32_t bodyLen);
......
......@@ -186,23 +186,23 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
}
}
if (strcmp(key, "connection.ip") == 0) {
if (strcmp(key, "td.connect.ip") == 0) {
conf->ip = strdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.user") == 0) {
if (strcmp(key, "td.connect.user") == 0) {
conf->user = strdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.pass") == 0) {
if (strcmp(key, "td.connect.pass") == 0) {
conf->pass = strdup(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.port") == 0) {
if (strcmp(key, "td.connect.port") == 0) {
conf->port = atoi(value);
return TMQ_CONF_OK;
}
if (strcmp(key, "connection.db") == 0) {
if (strcmp(key, "td.connect.db") == 0) {
conf->db = strdup(value);
return TMQ_CONF_OK;
}
......@@ -223,13 +223,13 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) {
}
void tmq_list_destroy(tmq_list_t* list) {
SArray* container = (SArray*)list;
SArray* container = &list->container;
/*taosArrayDestroy(container);*/
taosArrayDestroyEx(container, (void (*)(void*))taosMemoryFree);
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
tmq_message_t* msg;
tmq_message_t* msg = NULL;
while (1) {
taosGetQitem(tmq->qall, (void**)&msg);
if (msg)
......@@ -807,7 +807,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("msg discard %x\n", code);
printf("msg discard, code:%x\n", code);
goto WRITE_QUEUE_FAIL;
}
......@@ -877,10 +877,10 @@ WRITE_QUEUE_FAIL:
}
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
/*printf("call update ep %d\n", epoch);*/
bool set = false;
int32_t sz = taosArrayGetSize(pRsp->topics);
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
for (int32_t i = 0; i < sz; i++) {
SMqClientTopic topic = {0};
SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i);
......@@ -899,8 +899,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
taosArrayPush(topic.vgs, &clientVg);
set = true;
}
taosArrayPush(tmq->clientTopics, &topic);
taosArrayPush(newTopics, &topic);
}
if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = newTopics;
atomic_store_32(&tmq->epoch, epoch);
return set;
}
......@@ -1219,6 +1221,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) {
/*printf("epoch match\n");*/
SMqClientVg* pVg = rspMsg->vg;
/*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/
pVg->currentOffset = rspMsg->msg.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
return rspMsg;
......
......@@ -56,7 +56,6 @@ void dndCleanupServer(SDnode *pDnode);
int32_t dndInitClient(SDnode *pDnode);
void dndCleanupClient(SDnode *pDnode);
int32_t dndInitMsgHandle(SDnode *pDnode);
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp);
#ifdef __cplusplus
}
......
......@@ -20,7 +20,7 @@ static void dndResetLog(SMgmtWrapper *pMgmt) {
char logname[24] = {0};
snprintf(logname, sizeof(logname), "%slog", pMgmt->name);
dInfo("node:%s, reset log to %s", pMgmt->name, logname);
dInfo("node:%s, reset log to %s in child process", pMgmt->name, logname);
taosCloseLog();
taosInitLog(logname, 1);
}
......@@ -51,6 +51,7 @@ int32_t dndOpenNode(SMgmtWrapper *pWrapper) {
void dndCloseNode(SMgmtWrapper *pWrapper) {
dDebug("node:%s, start to close", pWrapper->name);
pWrapper->required = false;
taosWLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
(*pWrapper->fp.closeFp)(pWrapper);
......@@ -138,7 +139,7 @@ static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int32_t
static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRsp, int32_t msgLen, void *pCont, int32_t contLen) {
dTrace("msg:%p, get from parent queue", pRsp);
pRsp->pCont = pCont;
dndSendRpcRsp(pWrapper, pRsp);
dndSendRsp(pWrapper, pRsp);
taosMemoryFree(pRsp);
}
......@@ -178,7 +179,6 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
.parentFreeHeadFp = (ProcFreeFp)taosMemoryFree,
.parentMallocBodyFp = (ProcMallocFp)rpcMallocCont,
.parentFreeBodyFp = (ProcFreeFp)rpcFreeCont,
.testFlag = 0,
.pParent = pWrapper,
.name = pWrapper->name};
SProcObj *pProc = taosProcInit(&cfg);
......@@ -200,7 +200,7 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
dInfo("node:%s, will be initialized in child process", pWrapper->name);
dndOpenNode(pWrapper);
} else {
dInfo("node:%s, will not start in parent process", pWrapper->name);
dInfo("node:%s, will not start in parent process, child pid:%d", pWrapper->name, taosProcChildId(pProc));
pWrapper->procType = PROC_PARENT;
}
......@@ -210,16 +210,20 @@ static int32_t dndRunInMultiProcess(SDnode *pDnode) {
}
}
#if 0
SMgmtWrapper *pWrapper = dndAcquireWrapper(pDnode, DNODE);
if (pWrapper->procType == PROC_PARENT && dmStart(pWrapper->pMgmt) != 0) {
dndReleaseWrapper(pWrapper);
dError("failed to start dnode worker since %s", terrstr());
return -1;
dndSetStatus(pDnode, DND_STAT_RUNNING);
for (ENodeType n = 0; n < NODE_MAX; ++n) {
SMgmtWrapper *pWrapper = &pDnode->wrappers[n];
if (!pWrapper->required) continue;
if (pWrapper->fp.startFp == NULL) continue;
if (pWrapper->procType == PROC_PARENT && n != DNODE) continue;
if (pWrapper->procType == PROC_CHILD && n == DNODE) continue;
if ((*pWrapper->fp.startFp)(pWrapper) != 0) {
dError("node:%s, failed to start since %s", pWrapper->name, terrstr());
return -1;
}
}
dndReleaseWrapper(pWrapper);
#endif
return 0;
}
......
......@@ -16,14 +16,16 @@
#define _DEFAULT_SOURCE
#include "dndInt.h"
#define MAXLEN 1024
int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0;
int32_t maxLen = 1024;
char *content = taosMemoryCalloc(1, maxLen + 1);
cJSON *root = NULL;
char file[PATH_MAX];
TdFilePtr pFile = NULL;
int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR;
int32_t len = 0;
const int32_t maxLen = MAXLEN;
char content[MAXLEN + 1] = {0};
cJSON *root = NULL;
char file[PATH_MAX];
TdFilePtr pFile = NULL;
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
pFile = taosOpenFile(file, TD_FILE_READ);
......@@ -57,7 +59,6 @@ int32_t dndReadFile(SMgmtWrapper *pWrapper, bool *pDeployed) {
dDebug("succcessed to read file %s, deployed:%d", file, *pDeployed);
_OVER:
if (content != NULL) taosMemoryFree(content);
if (root != NULL) cJSON_Delete(root);
if (pFile != NULL) taosCloseFile(&pFile);
......@@ -66,7 +67,7 @@ _OVER:
}
int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
char file[PATH_MAX];
char file[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC);
......@@ -76,9 +77,9 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
return -1;
}
int32_t len = 0;
int32_t maxLen = 1024;
char *content = taosMemoryCalloc(1, maxLen + 1);
int32_t len = 0;
const int32_t maxLen = MAXLEN;
char content[MAXLEN + 1] = {0};
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"deployed\": %d\n", deployed);
......@@ -87,9 +88,8 @@ int32_t dndWriteFile(SMgmtWrapper *pWrapper, bool deployed) {
taosWriteFile(pFile, content, len);
taosFsyncFile(pFile);
taosCloseFile(&pFile);
taosMemoryFree(content);
char realfile[PATH_MAX];
char realfile[PATH_MAX] = {0};
snprintf(realfile, sizeof(realfile), "%s%s%s.json", pWrapper->path, TD_DIRSEP, pWrapper->name);
if (taosRenameFile(file, realfile) != 0) {
......
......@@ -43,36 +43,40 @@ static inline int32_t dndBuildMsg(SNodeMsg *pMsg, SRpcMsg *pRpc) {
memcpy(pMsg->user, connInfo.user, TSDB_USER_LEN);
memcpy(&pMsg->rpcMsg, pRpc, sizeof(SRpcMsg));
return 0;
}
void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpSet) {
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
int32_t code = -1;
SNodeMsg *pMsg = NULL;
NodeMsgFp msgFp = NULL;
if (pEpSet && pEpSet->numOfEps > 0 && pRpc->msgType == TDMT_MND_STATUS_RSP) {
dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet);
}
if (dndMarkWrapper(pWrapper) != 0) goto _OVER;
if ((msgFp = dndGetMsgFp(pWrapper, pRpc)) == NULL) goto _OVER;
if ((pMsg = taosAllocateQitem(sizeof(SNodeMsg))) == NULL) goto _OVER;
if (dndBuildMsg(pMsg, pRpc) != 0) goto _OVER;
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
if (pWrapper->procType == PROC_SINGLE) {
dTrace("msg:%p, is created, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle, pMsg->user);
code = (*msgFp)(pWrapper->pMgmt, pMsg);
} else if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is created and will put into child queue, handle:%p app:%p user:%s", pMsg, pRpc->handle,
pRpc->ahandle, pMsg->user);
code = taosProcPutToChildQueue(pWrapper->pProc, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen);
} else {
dTrace("msg:%p, should not processed in child process, handle:%p app:%p user:%s", pMsg, pRpc->handle, pRpc->ahandle,
pMsg->user);
ASSERT(1);
}
_OVER:
if (code == 0) {
if (pWrapper->procType == PROC_PARENT) {
dTrace("msg:%p, is freed", pMsg);
dTrace("msg:%p, is freed in parent process", pMsg);
taosFreeQitem(pMsg);
rpcFreeCont(pRpc->pCont);
}
......
......@@ -175,7 +175,7 @@ int32_t dndMarkWrapper(SMgmtWrapper *pWrapper) {
int32_t code = 0;
taosRLockLatch(&pWrapper->latch);
if (pWrapper->deployed) {
if (pWrapper->deployed || (pWrapper->procType == PROC_PARENT && pWrapper->required)) {
int32_t refCount = atomic_add_fetch_32(&pWrapper->refCount, 1);
dTrace("node:%s, is marked, refCount:%d", pWrapper->name, refCount);
} else {
......
......@@ -348,7 +348,7 @@ int32_t dndSendReqToMnode(SMgmtWrapper *pWrapper, SRpcMsg *pReq) {
}
}
void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
static void dndSendRpcRsp(SMgmtWrapper *pWrapper, SRpcMsg *pRsp) {
if (pRsp->code == TSDB_CODE_APP_NOT_READY) {
SMgmtWrapper *pDnodeWrapper = dndAcquireWrapper(pWrapper->pDnode, DNODE);
if (pDnodeWrapper != NULL) {
......
......@@ -279,7 +279,6 @@ void vmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_VND_CONSUME, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY, (NodeMsgFp)vmProcessWriteMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_PIPE_EXEC, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_MERGE_EXEC, (NodeMsgFp)vmProcessMergeMsg, VND_VGID);
dndSetMsgHandle(pWrapper, TDMT_VND_STREAM_TRIGGER, (NodeMsgFp)vmProcessFetchMsg, VND_VGID);
......
......@@ -160,6 +160,24 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
}
}
static void vmProcessMergeQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SVnodeObj *pVnode = pInfo->ahandle;
SNodeMsg *pMsg = NULL;
for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, (void **)&pMsg);
dTrace("msg:%p, will be processed in vnode-merge queue", pMsg);
int32_t code = vnodeProcessFetchMsg(pVnode->pImpl, &pMsg->rpcMsg);
if (code != 0) {
vmSendRsp(pVnode->pWrapper, pMsg, code);
dTrace("msg:%p, is freed, result:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code));
rpcFreeCont(pMsg->rpcMsg.pCont);
taosFreeQitem(pMsg);
}
}
}
static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueType qtype) {
SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1;
......@@ -308,7 +326,7 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) {
int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) {
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue);
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue);
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeMsg);
pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeQueue);
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
......
......@@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode);
int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream);
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId);
#ifdef __cplusplus
}
......
......@@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans);
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId);
#ifdef __cplusplus
}
......
......@@ -36,11 +36,11 @@ int32_t tEncodeSStreamObj(SCoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SArray *pArray = taosArrayGet(pObj->tasks, i);
SArray *pArray = taosArrayGetP(pObj->tasks, i);
int32_t innerSz = taosArrayGetSize(pArray);
if (tEncodeI32(pEncoder, innerSz) < 0) return -1;
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask *pTask = taosArrayGet(pArray, j);
SStreamTask *pTask = taosArrayGetP(pArray, j);
if (tEncodeSStreamTask(pEncoder, pTask) < 0) return -1;
}
}
......@@ -76,17 +76,18 @@ int32_t tDecodeSStreamObj(SCoder *pDecoder, SStreamObj *pObj) {
int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1;
if (sz != 0) {
pObj->tasks = taosArrayInit(sz, sizeof(SArray));
pObj->tasks = taosArrayInit(sz, sizeof(void *));
for (int32_t i = 0; i < sz; i++) {
int32_t innerSz;
if (tDecodeI32(pDecoder, &innerSz) < 0) return -1;
SArray *pArray = taosArrayInit(innerSz, sizeof(SStreamTask));
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
for (int32_t j = 0; j < innerSz; j++) {
SStreamTask task;
if (tDecodeSStreamTask(pDecoder, &task) < 0) return -1;
taosArrayPush(pArray, &task);
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) return -1;
if (tDecodeSStreamTask(pDecoder, pTask) < 0) return -1;
taosArrayPush(pArray, &pTask);
}
taosArrayPush(pObj->tasks, pArray);
taosArrayPush(pObj->tasks, &pArray);
}
}
......
......@@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) {
return pVgroup;
}
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) {
SSdb* pSdb = pMnode->pSdb;
SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan);
if (pPlan == NULL) {
......@@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
// only for inplace
pTask->sinkType = TASK_SINK__SHOW;
pTask->showSink.reserved = 0;
if (smaId != -1) {
pTask->sinkType = TASK_SINK__SMA;
pTask->smaSink.smaId = smaId;
}
} else {
pTask->sinkType = TASK_SINK__NONE;
}
......@@ -185,6 +189,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
pTask->dispatchMsgType = TDMT_VND_TASK_MERGE_EXEC;
pTask->dispatchType = TASK_DISPATCH__FIXED;
pTask->fixedEpDispatcher.taskId = lastLevelTask->taskId;
pTask->fixedEpDispatcher.nodeId = lastLevelTask->nodeId;
pTask->fixedEpDispatcher.epSet = lastLevelTask->epSet;
}
......
......@@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {}
static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t size = sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
int32_t size =
sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE;
SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size);
if (pRaw == NULL) goto _OVER;
......@@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER;
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
......@@ -491,7 +492,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) {
mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
goto _OVER;
}
pStream = mndAcquireStream(pMnode, createReq.name);
if (pStream != NULL) {
mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name);
......
......@@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
return code;
}
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) {
int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) {
SNode *pAst = NULL;
if (nodesStringToNode(ast, &pAst) < 0) {
......@@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast
return -1;
}
if (mndScheduleStream(pMnode, pTrans, pStream) < 0) {
if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) {
mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr());
return -1;
}
......@@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
}
mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name);
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) {
if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) {
mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr());
mndTransDrop(pTrans);
return -1;
......
......@@ -21,7 +21,7 @@
#include "mndTrans.h"
#include "tbase64.h"
#define TSDB_USER_VER_NUMBER 1
#define TSDB_USER_VER_NUMBER 1
#define TSDB_USER_RESERVE_SIZE 64
static int32_t mndCreateDefaultUsers(SMnode *pMnode);
......@@ -270,7 +270,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, SCreateUserReq *pCreate
userObj.updateTime = userObj.createdTime;
userObj.superUser = pCreate->superUser;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_CREATE_USER, &pReq->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_USER, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("user:%s, failed to create since %s", pCreate->user, terrstr());
return -1;
......@@ -350,7 +350,7 @@ CREATE_USER_OVER:
}
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOld, SUserObj *pNew, SNodeMsg *pReq) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER,&pReq->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_ALTER_USER, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("user:%s, failed to update since %s", pOld->user, terrstr());
return -1;
......@@ -511,7 +511,7 @@ ALTER_USER_OVER:
}
static int32_t mndDropUser(SMnode *pMnode, SNodeMsg *pReq, SUserObj *pUser) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK,TRN_TYPE_DROP_USER, &pReq->rpcMsg);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_DROP_USER, &pReq->rpcMsg);
if (pTrans == NULL) {
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
return -1;
......
......@@ -198,10 +198,13 @@ int tqCommit(STQ*);
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
int32_t tqProcessRebReq(STQ* pTq, char* msg);
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg);
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen);
// sma
void smaHandleRes(SVnode* pVnode, int64_t smaId, const SArray* data);
#ifdef __cplusplus
}
#endif
......
......@@ -42,8 +42,8 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pVnodeMeta, STq
// TODO: error code of buffer pool
}
#endif
pTq->tqMeta =
tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer, (FTqDelete)taosMemoryFree, 0);
pTq->tqMeta = tqStoreOpen(pTq, path, (FTqSerialize)tqSerializeConsumer, (FTqDeserialize)tqDeserializeConsumer,
(FTqDelete)taosMemoryFree, 0);
if (pTq->tqMeta == NULL) {
taosMemoryFree(pTq);
#if 0
......@@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
if (tqExpandTask(pTq, pTask, 4) < 0) {
ASSERT(0);
}
pTask->ahandle = pTq->pVnode;
taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask));
......@@ -497,13 +498,15 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen) {
return 0;
}
int32_t tqProcessTaskExec(STQ* pTq, SRpcMsg* msg) {
SStreamTaskExecReq* pReq = msg->pCont;
int32_t taskId = pReq->taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen) {
SStreamTaskExecReq req;
tDecodeSStreamTaskExecReq(msg, &req);
int32_t taskId = req.taskId;
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
ASSERT(pTask);
if (streamExecTask(pTask, &pTq->pVnode->msgCb, pReq->data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) {
if (streamExecTask(pTask, &pTq->pVnode->msgCb, req.data, STREAM_DATA_TYPE_SSDATA_BLOCK, 0) < 0) {
// TODO
}
return 0;
......
......@@ -43,6 +43,8 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
vTrace("message in fetch queue is processing");
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
switch (pMsg->msgType) {
case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
......@@ -65,8 +67,9 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
return vnodeGetTableMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_VND_TASK_EXEC:
return tqProcessTaskExec(pVnode->pTq, pMsg);
case TDMT_VND_TASK_PIPE_EXEC:
case TDMT_VND_TASK_MERGE_EXEC:
return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen);
case TDMT_VND_STREAM_TRIGGER:
return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen);
case TDMT_VND_QUERY_HEARTBEAT:
......
......@@ -178,6 +178,11 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_TASK_WRITE_EXEC: {
if (tqProcessTaskExec(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)),
pMsg->contLen - sizeof(SMsgHead)) < 0) {
}
} break;
case TDMT_VND_CREATE_SMA: { // timeRangeSMA
#if 1
......
......@@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
if (pTask->sinkType == TASK_SINK__TABLE) {
//
} else if (pTask->sinkType == TASK_SINK__SMA) {
pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes);
//
} else if (pTask->sinkType == TASK_SINK__FETCH) {
//
......@@ -121,7 +122,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
SStreamTaskExecReq req = {
.streamId = pTask->streamId,
.taskId = pTask->taskId,
.taskId = pTask->fixedEpDispatcher.taskId,
.data = pRes,
};
......@@ -205,14 +206,22 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType != TASK_SINK__NONE) {
// TODO: wrap
if (pTask->sinkType == TASK_SINK__TABLE) {
if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tEncodeI8(pEncoder, pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tEncodeI8(pEncoder, pTask->inplaceDispatcher.reserved) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->inplaceDispatcher.taskId) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
......@@ -243,13 +252,22 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) {
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
if (pTask->sinkType != TASK_SINK__NONE) {
if (pTask->sinkType == TASK_SINK__TABLE) {
if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SMA) {
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__FETCH) {
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
} else if (pTask->sinkType == TASK_SINK__SHOW) {
if (tDecodeI8(pDecoder, &pTask->showSink.reserved) < 0) return -1;
} else {
ASSERT(pTask->sinkType == TASK_SINK__NONE);
}
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
if (tDecodeI8(pDecoder, &pTask->inplaceDispatcher.reserved) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->inplaceDispatcher.taskId) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1;
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
......
......@@ -19,13 +19,13 @@
#include "tref.h"
#include "walInt.h"
int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
int64_t FORCE_INLINE walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
int64_t inline walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
}
......@@ -46,7 +46,7 @@ void* tmemmem(char* haystack, int hlen, char* needle, int nlen) {
return NULL;
}
static inline int64_t walScanLogGetLastVer(SWal* pWal) {
static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) {
ASSERT(pWal->fileInfoSet != NULL);
int sz = taosArrayGetSize(pWal->fileInfoSet);
ASSERT(sz > 0);
......
......@@ -74,9 +74,9 @@ int walSetWrite(SWal* pWal) {
}
int walChangeWrite(SWal* pWal, int64_t ver) {
int code = 0;
int code;
TdFilePtr pIdxTFile, pLogTFile;
char fnameStr[WAL_FILE_LEN];
char fnameStr[WAL_FILE_LEN];
if (pWal->pWriteLogTFile != NULL) {
code = taosCloseFile(&pWal->pWriteLogTFile);
if (code != 0) {
......@@ -133,7 +133,6 @@ int walSeekWriteVer(SWal* pWal, int64_t ver) {
return -1;
}
if (ver < pWal->vers.snapshotVer) {
}
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeWrite(pWal, ver);
......
......@@ -56,7 +56,6 @@ typedef struct SProcObj {
int32_t pid;
bool isChild;
bool stopFlag;
bool testFlag;
} SProcObj;
static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
......@@ -77,7 +76,7 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
goto _OVER;
}
shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), 0600);
shmid = shmget(IPC_PRIVATE, sizeof(TdThreadMutex), IPC_CREAT | 0600);
if (shmid <= 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to init mutex while shmget since %s", terrstr());
......@@ -101,8 +100,13 @@ static int32_t taosProcInitMutex(TdThreadMutex **ppMutex, int32_t *pShmid) {
_OVER:
if (code != 0) {
taosThreadMutexDestroy(pMutex);
shmctl(shmid, IPC_RMID, NULL);
if (pMutex != NULL) {
taosThreadMutexDestroy(pMutex);
shmdt(pMutex);
}
if (shmid >= 0) {
shmctl(shmid, IPC_RMID, NULL);
}
} else {
*ppMutex = pMutex;
*pShmid = shmid;
......@@ -112,12 +116,12 @@ _OVER:
return code;
}
static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t *pShmid) {
static void taosProcDestroyMutex(TdThreadMutex *pMutex, int32_t shmid) {
if (pMutex != NULL) {
taosThreadMutexDestroy(pMutex);
}
if (*pShmid > 0) {
shmctl(*pShmid, IPC_RMID, NULL);
if (shmid >= 0) {
shmctl(shmid, IPC_RMID, NULL);
}
}
......@@ -141,13 +145,14 @@ static int32_t taosProcInitBuffer(void **ppBuffer, int32_t size) {
return shmid;
}
static void taosProcDestroyBuffer(void *pBuffer, int32_t *pShmid) {
if (*pShmid > 0) {
shmctl(*pShmid, IPC_RMID, NULL);
static void taosProcDestroyBuffer(void *pBuffer, int32_t shmid) {
if (shmid > 0) {
shmdt(pBuffer);
shmctl(shmid, IPC_RMID, NULL);
}
}
static SProcQueue *taosProcQueueInit(int32_t size) {
static SProcQueue *taosProcInitQueue(int32_t size) {
if (size <= 0) size = SHM_DEFAULT_SIZE;
int32_t bufSize = CEIL8(size);
......@@ -155,29 +160,28 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
SProcQueue *pQueue = NULL;
int32_t shmId = taosProcInitBuffer((void **)&pQueue, bufSize + headSize);
if (shmId <= 0) {
if (shmId < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pQueue->bufferShmid = shmId;
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
taosMemoryFree(pQueue);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
return NULL;
}
if (tsem_init(&pQueue->sem, 1, 0) != 0) {
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
taosMemoryFree(pQueue);
taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
if (taosProcInitMutex(&pQueue->mutex, &pQueue->mutexShmid) != 0) {
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem);
taosMemoryFree(pQueue);
taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
return NULL;
}
......@@ -190,12 +194,12 @@ static SProcQueue *taosProcQueueInit(int32_t size) {
return pQueue;
}
static void taosProcQueueCleanup(SProcQueue *pQueue) {
static void taosProcCleanupQueue(SProcQueue *pQueue) {
if (pQueue != NULL) {
uDebug("proc:%s, queue:%p clean up", pQueue->name, pQueue);
taosProcDestroyMutex(pQueue->mutex, &pQueue->mutexShmid);
tsem_destroy(&pQueue->sem);
taosMemoryFree(pQueue);
taosProcDestroyMutex(pQueue->mutex, pQueue->mutexShmid);
taosProcDestroyBuffer(pQueue, pQueue->bufferShmid);
}
}
......@@ -204,6 +208,11 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
const int32_t bodyLen = CEIL8(rawBodyLen);
const int32_t fullLen = headLen + bodyLen + 8;
if (headLen <= 0 || bodyLen <= 0) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
taosThreadMutexLock(pQueue->mutex);
if (fullLen > pQueue->avail) {
taosThreadMutexUnlock(pQueue->mutex);
......@@ -255,7 +264,7 @@ static int32_t taosProcQueuePush(SProcQueue *pQueue, char *pHead, int32_t rawHea
taosThreadMutexUnlock(pQueue->mutex);
tsem_post(&pQueue->sem);
uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, rawHeadLen, pBody, rawBodyLen, pQueue);
uTrace("proc:%s, push msg:%p:%d cont:%p:%d to queue:%p", pQueue->name, pHead, headLen, pBody, bodyLen, pQueue);
return 0;
}
......@@ -344,12 +353,10 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
}
pProc->name = pCfg->name;
pProc->testFlag = pCfg->testFlag;
pProc->pChildQueue = taosProcQueueInit(pCfg->childQueueSize);
pProc->pParentQueue = taosProcQueueInit(pCfg->parentQueueSize);
pProc->pChildQueue = taosProcInitQueue(pCfg->childQueueSize);
pProc->pParentQueue = taosProcInitQueue(pCfg->parentQueueSize);
if (pProc->pChildQueue == NULL || pProc->pParentQueue == NULL) {
taosProcQueueCleanup(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pChildQueue);
taosMemoryFree(pProc);
return NULL;
}
......@@ -369,17 +376,15 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) {
pProc->pParentQueue->freeBodyFp = pCfg->parentFreeBodyFp;
pProc->pParentQueue->consumeFp = pCfg->parentConsumeFp;
uDebug("proc:%s, initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue);
uDebug("proc:%s, is initialized, child queue:%p parent queue:%p", pProc->name, pProc->pChildQueue, pProc->pParentQueue);
if (!pProc->testFlag) {
pProc->pid = fork();
if (pProc->pid == 0) {
pProc->isChild = 1;
uInfo("this is child process, pid:%d", pProc->pid);
} else {
pProc->isChild = 0;
uInfo("this is parent process, pid:%d", pProc->pid);
}
pProc->pid = fork();
if (pProc->pid == 0) {
pProc->isChild = 1;
prctl(PR_SET_NAME, pProc->name, NULL, NULL, NULL);
} else {
pProc->isChild = 0;
uInfo("this is parent process, child pid:%d", pProc->pid);
}
return pProc;
......@@ -398,7 +403,7 @@ static void taosProcThreadLoop(SProcQueue *pQueue) {
if (code < 0) {
uDebug("proc:%s, get no message from queue:%p and exiting", pQueue->name, pQueue);
break;
} else if (code < 0) {
} else if (code == 0) {
uTrace("proc:%s, get no message from queue:%p since %s", pQueue->name, pQueue, terrstr());
taosMsleep(1);
continue;
......@@ -413,16 +418,14 @@ int32_t taosProcRun(SProcObj *pProc) {
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (pProc->isChild || pProc->testFlag) {
if (pProc->isChild) {
if (taosThreadCreate(&pProc->childThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pChildQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr());
return -1;
}
uDebug("proc:%s, child start to consume queue:%p", pProc->name, pProc->pChildQueue);
}
if (!pProc->isChild || pProc->testFlag) {
} else {
if (taosThreadCreate(&pProc->parentThread, &thAttr, (ProcThreadFp)taosProcThreadLoop, pProc->pParentQueue) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to create thread since %s", terrstr());
......@@ -441,12 +444,14 @@ void taosProcStop(SProcObj *pProc) {
bool taosProcIsChild(SProcObj *pProc) { return pProc->isChild; }
int32_t taosProcChildId(SProcObj *pProc) { return pProc->pid; }
void taosProcCleanup(SProcObj *pProc) {
if (pProc != NULL) {
uDebug("proc:%s, clean up", pProc->name);
taosProcStop(pProc);
taosProcQueueCleanup(pProc->pChildQueue);
taosProcQueueCleanup(pProc->pParentQueue);
taosProcCleanupQueue(pProc->pChildQueue);
taosProcCleanupQueue(pProc->pParentQueue);
taosMemoryFree(pProc);
}
}
......
......@@ -45,7 +45,7 @@ print cmd===> system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c
system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal
print cmd result----> $system_content
if $system_content != @{consume success: 100}@ then
print not match in pos000
return -1
endi
sql show databases
......
......@@ -314,7 +314,7 @@ int32_t init_env() {
}
//const char* sql = "select * from tu1";
sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s0", g_stConfInfo.stbName);
sprintf(sqlStr, "create topic test_stb_topic_1 as select ts,c0 from %s", g_stConfInfo.stbName);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {
......@@ -351,36 +351,6 @@ tmq_list_t* build_topic_list() {
return topic_list;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
/*clock_t startTime = clock();*/
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) {
cnt++;
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
/*} else {*/
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
......@@ -438,7 +408,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
if (batchCnt != totalMsgs) {
printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
exit(-1);
/*exit(-1);*/
}
if (0 == g_stConfInfo.simCase) {
......@@ -691,12 +661,13 @@ int main(int32_t argc, char *argv[]) {
float rowsSpeed = totalRows / seconds;
float msgsSpeed = totalMsgs / seconds;
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
if (walLogSize <= 0) {
printf("vnode2/wal size incorrect!");
/*exit(-1);*/
} else {
if (0 == g_stConfInfo.simCase) {
if (0 == g_stConfInfo.simCase) {
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
if (walLogSize <= 0) {
printf("%s size incorrect!", g_stConfInfo.vnodeWalPath);
exit(-1);
} else {
pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0));
}
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册