未验证 提交 2af07843 编写于 作者: L Liu Jicong 提交者: GitHub

Merge pull request #10755 from taosdata/feature/tq

Feature/tq
...@@ -190,7 +190,10 @@ typedef struct SEp { ...@@ -190,7 +190,10 @@ typedef struct SEp {
typedef struct { typedef struct {
int32_t contLen; int32_t contLen;
int32_t vgId; union {
int32_t vgId;
int32_t streamTaskId;
};
} SMsgHead; } SMsgHead;
// Submit message for one table // Submit message for one table
......
...@@ -23,6 +23,9 @@ ...@@ -23,6 +23,9 @@
extern "C" { extern "C" {
#endif #endif
#define SND_UNIQUE_THREAD_NUM 2
#define SND_SHARED_THREAD_NUM 2
/* ------------------------ TYPES EXPOSED ------------------------ */ /* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef struct SSnode SSnode; typedef struct SSnode SSnode;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "taoserror.h"
#include "thash.h"
/**
* Generate an non-negative signed 32bit id
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 6bit |6bit | 12bit | 8bit |
*+------------+-----+-----------+---------------+
* @return
*/
int32_t tGenIdPI32(void);
/**
* Generate an non-negative signed 64bit id
*+------------+-----+-----------+---------------+
*| uid|localIp| PId | timestamp | serial number |
*+------------+-----+-----------+---------------+
*| 12bit |12bit|24bit |16bit |
*+------------+-----+-----------+---------------+
* @return
*/
int64_t tGenIdPI64(void);
...@@ -166,7 +166,7 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { ...@@ -166,7 +166,7 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) {
static int32_t dndStartSnodeWorker(SDnode *pDnode) { static int32_t dndStartSnodeWorker(SDnode *pDnode) {
SSnodeMgmt *pMgmt = &pDnode->smgmt; SSnodeMgmt *pMgmt = &pDnode->smgmt;
pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *)); pMgmt->uniqueWorkers = taosArrayInit(0, sizeof(void *));
for (int32_t i = 0; i < 2; i++) { for (int32_t i = 0; i < SND_UNIQUE_THREAD_NUM; i++) {
SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker)); SDnodeWorker *pUniqueWorker = malloc(sizeof(SDnodeWorker));
if (pUniqueWorker == NULL) { if (pUniqueWorker == NULL) {
return -1; return -1;
...@@ -177,8 +177,8 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) { ...@@ -177,8 +177,8 @@ static int32_t dndStartSnodeWorker(SDnode *pDnode) {
} }
taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker); taosArrayPush(pMgmt->uniqueWorkers, &pUniqueWorker);
} }
if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", 4, 4, if (dndInitWorker(pDnode, &pMgmt->sharedWorker, DND_WORKER_SINGLE, "snode-shared", SND_SHARED_THREAD_NUM,
dndProcessSnodeSharedQueue)) { SND_SHARED_THREAD_NUM, dndProcessSnodeSharedQueue)) {
dError("failed to start snode shared worker since %s", terrstr()); dError("failed to start snode shared worker since %s", terrstr());
return -1; return -1;
} }
...@@ -369,13 +369,39 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) { ...@@ -369,13 +369,39 @@ static void dndProcessSnodeSharedQueue(SDnode *pDnode, SRpcMsg *pMsg) {
taosFreeQitem(pMsg); taosFreeQitem(pMsg);
} }
static void dndWriteSnodeMsgToRandomWorker(SDnode *pDnode, SRpcMsg *pMsg) { static FORCE_INLINE int32_t dndGetSWIdFromMsg(SRpcMsg *pMsg) {
SMsgHead *pHead = pMsg->pCont;
pHead->streamTaskId = htonl(pHead->streamTaskId);
return pHead->streamTaskId % SND_UNIQUE_THREAD_NUM;
}
static void dndWriteSnodeMsgToWorkerByMsg(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) {
int32_t index = dndGetSWIdFromMsg(pMsg);
SDnodeWorker *pWorker = taosArrayGetP(pDnode->smgmt.uniqueWorkers, index);
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
}
dndReleaseSnode(pDnode, pSnode);
if (code != 0) {
if (pMsg->msgType & 1u) {
SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code};
rpcSendResponse(&rsp);
}
rpcFreeCont(pMsg->pCont);
}
}
static void dndWriteSnodeMsgToMgmtWorker(SDnode *pDnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED;
SSnode *pSnode = dndAcquireSnode(pDnode); SSnode *pSnode = dndAcquireSnode(pDnode);
if (pSnode != NULL) { if (pSnode != NULL) {
int32_t index = (pDnode->smgmt.uniqueWorkerInUse + 1) % taosArrayGetSize(pDnode->smgmt.uniqueWorkers); SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, 0);
SDnodeWorker *pWorker = taosArrayGet(pDnode->smgmt.uniqueWorkers, index);
code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg));
} }
dndReleaseSnode(pDnode, pSnode); dndReleaseSnode(pDnode, pSnode);
...@@ -407,9 +433,12 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc ...@@ -407,9 +433,12 @@ static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpc
} }
} }
void dndProcessSnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
dndWriteSnodeMsgToMgmtWorker(pDnode, pMsg);
}
void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeUniqueMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
// judge from msg to write to unique queue dndWriteSnodeMsgToWorkerByMsg(pDnode, pMsg);
dndWriteSnodeMsgToRandomWorker(pDnode, pMsg);
} }
void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { void dndProcessSnodeSharedMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) {
......
...@@ -30,41 +30,42 @@ extern "C" { ...@@ -30,41 +30,42 @@ extern "C" {
#endif #endif
enum { enum {
STREAM_STATUS__READY = 1, STREAM_STATUS__RUNNING = 1,
STREAM_STATUS__STOPPED, STREAM_STATUS__STOPPED,
STREAM_STATUS__CREATING, STREAM_STATUS__CREATING,
STREAM_STATUS__STOPING, STREAM_STATUS__STOPING,
STREAM_STATUS__RESUMING, STREAM_STATUS__RESTORING,
STREAM_STATUS__DELETING, STREAM_STATUS__DELETING,
}; };
enum { enum {
STREAM_RUNNER__RUNNING = 1, STREAM_TASK_STATUS__RUNNING = 1,
STREAM_RUNNER__STOP, STREAM_TASK_STATUS__STOP,
}; };
typedef struct {
SHashObj* pHash; // taskId -> streamTask
} SStreamMeta;
typedef struct SSnode { typedef struct SSnode {
SSnodeOpt cfg; SStreamMeta* pMeta;
SSnodeOpt cfg;
} SSnode; } SSnode;
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
int32_t taskId;
int32_t IdxInLevel; int32_t IdxInLevel;
int32_t level; int32_t level;
} SStreamInfo; } SStreamTaskInfo;
typedef struct { typedef struct {
SStreamInfo meta; SStreamTaskInfo meta;
int8_t status; int8_t status;
void* executor; void* executor;
STaosQueue* queue; void* stateStore;
void* stateStore;
// storage handle // storage handle
} SStreamRunner; } SStreamTask;
typedef struct {
SHashObj* pHash;
} SStreamMeta;
int32_t sndCreateStream(); int32_t sndCreateStream();
int32_t sndDropStream(); int32_t sndDropStream();
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "sndInt.h" #include "sndInt.h"
#include "tuuid.h"
SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
SSnode *pSnode = calloc(1, sizeof(SSnode)); SSnode *pSnode = calloc(1, sizeof(SSnode));
...@@ -32,6 +33,16 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ...@@ -32,6 +33,16 @@ int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
void sndDestroy(const char *path) {} void sndDestroy(const char *path) {}
static int32_t sndDeployTask(SSnode *pSnode, SRpcMsg *pMsg) {
SStreamTask *task = malloc(sizeof(SStreamTask));
if (task == NULL) {
return -1;
}
task->meta.taskId = tGenIdPI32();
taosHashPut(pSnode->pMeta->pHash, &task->meta.taskId, sizeof(int32_t), &task, sizeof(void *));
return 0;
}
int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) { int32_t sndProcessUMsg(SSnode *pSnode, SRpcMsg *pMsg) {
// stream deployment // stream deployment
// stream stop/resume // stream stop/resume
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tuuid.h"
static int64_t hashId = 0;
static int32_t SerialNo = 0;
int32_t tGenIdPI32(void) {
if (hashId == 0) {
char uid[64];
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
}
}
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&SerialNo, 1);
int32_t id = ((hashId & 0x1F) << 26) | ((pid & 0x3F) << 20) | ((ts & 0xFFF) << 8) | (val & 0xFF);
return id;
}
int64_t tGenIdPI64(void) {
if (hashId == 0) {
char uid[64];
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
if (code != TSDB_CODE_SUCCESS) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
} else {
hashId = MurmurHash3_32(uid, strlen(uid));
}
}
int64_t ts = taosGetTimestampMs();
uint64_t pid = taosGetPId();
int32_t val = atomic_add_fetch_32(&SerialNo, 1);
int64_t id = ((hashId & 0x07FF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF);
return id;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册