提交 aa77115f 编写于 作者: H hjxilinx

[TD-32]

...@@ -225,17 +225,17 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { ...@@ -225,17 +225,17 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL; vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb; vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL; vnodeObj.replica = NULL;
vnodeObj.events = NULL; vnodeObj.events = NULL;
vnodeObj.cq = NULL; vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
dTrace("open vnode:%d in %s", vnodeObj.vgId, rootDir); dTrace("open vnode:%d in %s", pVnode->vgId, rootDir);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -314,17 +314,17 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -314,17 +314,17 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) {
vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.status = TSDB_VN_STATUS_NOT_READY;
vnodeObj.refCount = 1; vnodeObj.refCount = 1;
vnodeObj.version = 0; vnodeObj.version = 0;
vnodeObj.wworker = dnodeAllocateWriteWorker();
vnodeObj.rworker = dnodeAllocateReadWorker();
vnodeObj.wal = NULL; vnodeObj.wal = NULL;
vnodeObj.tsdb = pTsdb; vnodeObj.tsdb = pTsdb;
vnodeObj.replica = NULL; vnodeObj.replica = NULL;
vnodeObj.events = NULL; vnodeObj.events = NULL;
vnodeObj.cq = NULL; vnodeObj.cq = NULL;
taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj));
pVnode->wworker = dnodeAllocateWriteWorker(pVnode);
pVnode->rworker = dnodeAllocateReadWorker(pVnode);
dPrint("vgroup:%d, vnode:%d is created", vnodeObj.vgId, vnodeObj.vgId); dPrint("vgroup:%d, vnode:%d is created", pVnode->vgId, pVnode->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
......
...@@ -36,16 +36,15 @@ typedef struct { ...@@ -36,16 +36,15 @@ typedef struct {
void *pCont; void *pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
void *pVnode;
SRpcContext *pRpcContext; // RPC message context SRpcContext *pRpcContext; // RPC message context
} SReadMsg; } SReadMsg;
static void *dnodeProcessReadQueue(void *param); static void *dnodeProcessReadQueue(void *param);
static void dnodeProcessReadResult(SReadMsg *pRead); static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead);
static void dnodeHandleIdleReadWorker(); static void dnodeHandleIdleReadWorker();
static void dnodeProcessQueryMsg(SReadMsg *pMsg); static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg);
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg); static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg);
static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode); static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode);
// module global variable // module global variable
static taos_qset readQset; static taos_qset readQset;
...@@ -104,23 +103,19 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -104,23 +103,19 @@ void dnodeRead(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SReadMsg readMsg = { SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
.rpcMsg = *pMsg, pRead->rpcMsg = *pMsg;
.pCont = pCont, pRead->pCont = pCont;
.contLen = pHead->contLen, pRead->contLen = pHead->contLen;
.pRpcContext = pRpcContext, pRead->pRpcContext = pRpcContext;
.pVnode = pVnode,
};
taos_queue queue = dnodeGetVnodeRworker(pVnode); taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, &readMsg); taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// next vnode // next vnode
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
pCont -= pHead->contLen; pCont -= pHead->contLen;
queuedMsgNum++; queuedMsgNum++;
dnodeReleaseVnode(pVnode);
} }
if (queuedMsgNum == 0) { if (queuedMsgNum == 0) {
...@@ -135,11 +130,11 @@ void dnodeRead(SRpcMsg *pMsg) { ...@@ -135,11 +130,11 @@ void dnodeRead(SRpcMsg *pMsg) {
} }
} }
void *dnodeAllocateReadWorker() { void *dnodeAllocateReadWorker(void *pVnode) {
taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); taos_queue *queue = taosOpenQueue(sizeof(SReadMsg));
if (queue == NULL) return NULL; if (queue == NULL) return NULL;
taosAddIntoQset(readQset, queue); taosAddIntoQset(readQset, queue, pVnode);
// spawn a thread to process queue // spawn a thread to process queue
if (threads < maxThreads) { if (threads < maxThreads) {
...@@ -164,20 +159,27 @@ void dnodeFreeReadWorker(void *rqueue) { ...@@ -164,20 +159,27 @@ void dnodeFreeReadWorker(void *rqueue) {
static void *dnodeProcessReadQueue(void *param) { static void *dnodeProcessReadQueue(void *param) {
taos_qset qset = (taos_qset)param; taos_qset qset = (taos_qset)param;
SReadMsg readMsg; SReadMsg *pReadMsg;
int type;
void *pVnode;
while (1) { while (1) {
if (taosReadQitemFromQset(qset, &readMsg) <= 0) { if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) {
dnodeHandleIdleReadWorker(); dnodeHandleIdleReadWorker();
continue; continue;
} }
terrno = 0; terrno = 0;
if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) { if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) {
(*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg); (*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
// dnodeProcessReadResult(pVnode, pReadMsg);
taosFreeQitem(pReadMsg);
dnodeReleaseVnode(pVnode);
} }
return NULL; return NULL;
...@@ -195,11 +197,11 @@ static void dnodeHandleIdleReadWorker() { ...@@ -195,11 +197,11 @@ static void dnodeHandleIdleReadWorker() {
} }
} }
static void dnodeProcessReadResult(SReadMsg *pRead) { static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) {
SRpcContext *pRpcContext = pRead->pRpcContext; SRpcContext *pRpcContext = pRead->pRpcContext;
int32_t code = 0; int32_t code = 0;
dnodeReleaseVnode(pRead->pVnode); dnodeReleaseVnode(pVnode);
if (pRpcContext) { if (pRpcContext) {
if (terrno) { if (terrno) {
...@@ -218,34 +220,48 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { ...@@ -218,34 +220,48 @@ static void dnodeProcessReadResult(SReadMsg *pRead) {
code = terrno; code = terrno;
} }
//TODO: query handle is returned by dnodeProcessQueryMsg
if (0) {
SRpcMsg rsp; SRpcMsg rsp;
rsp.handle = pRead->rpcMsg.handle; rsp.handle = pRead->rpcMsg.handle;
rsp.code = code; rsp.code = code;
rsp.pCont = NULL; rsp.pCont = NULL;
rpcSendResponse(&rsp); rpcSendResponse(&rsp);
}
rpcFreeCont(pRead->rpcMsg.pCont); // free the received message rpcFreeCont(pRead->rpcMsg.pCont); // free the received message
} }
static void dnodeContinueExecuteQuery(void* qhandle, SReadMsg *pMsg) { static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) {
SReadMsg readMsg = {
.rpcMsg = {.msgType = TSDB_MSG_TYPE_QUERY},
.pCont = qhandle,
.contLen = 0,
.pRpcContext = pMsg->pRpcContext,
.pVnode = pMsg->pVnode,
};
taos_queue queue = dnodeGetVnodeRworker(pMsg->pVnode); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
taosWriteQitem(queue, &readMsg); pRead->rpcMsg = pMsg->rpcMsg;
pRead->pCont = qhandle;
pRead->contLen = 0;
pRead->pRpcContext = pMsg->pRpcContext;
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
taos_queue queue = dnodeGetVnodeRworker(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead);
// SReadMsg readMsg = {
// .rpcMsg = {0},
// .pCont = qhandle,
// .contLen = 0,
// .pRpcContext = pMsg->pRpcContext,
// };
//
// taos_queue queue = dnodeGetVnodeRworker(pVnode);
// taosWriteQitem(queue, TSDB_MSG_TYPE_QUERY, &readMsg);
} }
static void dnodeProcessQueryMsg(SReadMsg *pMsg) { static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) {
SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont;
SQInfo* pQInfo = NULL; SQInfo* pQInfo = NULL;
if (pMsg->rpcMsg.contLen != 0) { if (pMsg->contLen != 0) {
void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); void* tsdb = dnodeGetVnodeTsdb(pVnode);
int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, pMsg, &pQInfo); int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, NULL, &pQInfo);
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
pRsp->code = code; pRsp->code = code;
...@@ -264,16 +280,14 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { ...@@ -264,16 +280,14 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) {
pQInfo = pMsg->pCont; pQInfo = pMsg->pCont;
} }
// do execute query qTableQuery(pQInfo); // do execute query
qTableQuery(pQInfo);
} }
static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) {
SRetrieveTableMsg *pRetrieve = pMsg->pCont; SRetrieveTableMsg *pRetrieve = pMsg->pCont;
void *pQInfo = (void*) htobe64(pRetrieve->qhandle); void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId);
int32_t contLen = 0; int32_t contLen = 0;
SRetrieveTableRsp *pRsp = NULL; SRetrieveTableRsp *pRsp = NULL;
...@@ -288,8 +302,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -288,8 +302,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
// todo check code and handle error in build result set // todo check code and handle error in build result set
code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen);
if (qNeedFurtherExec(pQInfo)) { if (qHasMoreResultsToRetrieve(pQInfo)) {
dnodeContinueExecuteQuery(pQInfo, pMsg); dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg);
} else { // no further execution invoked, release the ref to vnode
dnodeProcessReadResult(pVnode, pMsg);
} }
} }
...@@ -302,7 +318,4 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { ...@@ -302,7 +318,4 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) {
}; };
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
//todo merge result should be done here
//dnodeProcessReadResult(&readMsg);
} }
...@@ -35,7 +35,6 @@ typedef struct _write { ...@@ -35,7 +35,6 @@ typedef struct _write {
void *pCont; void *pCont;
int32_t contLen; int32_t contLen;
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
void *pVnode; // pointer to vnode
SRpcContext *pRpcContext; // RPC message context SRpcContext *pRpcContext; // RPC message context
} SWriteMsg; } SWriteMsg;
...@@ -51,15 +50,15 @@ typedef struct _thread_obj { ...@@ -51,15 +50,15 @@ typedef struct _thread_obj {
SWriteWorker *writeWorker; SWriteWorker *writeWorker;
} SWriteWorkerPool; } SWriteWorkerPool;
static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *); static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *);
static void *dnodeProcessWriteQueue(void *param); static void *dnodeProcessWriteQueue(void *param);
static void dnodeHandleIdleWorker(SWriteWorker *pWorker); static void dnodeHandleIdleWorker(SWriteWorker *pWorker);
static void dnodeProcessWriteResult(SWriteMsg *pWrite); static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite);
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg); static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg); static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg); static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg); static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg);
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg); static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg);
SWriteWorkerPool wWorkerPool; SWriteWorkerPool wWorkerPool;
...@@ -116,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -116,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) {
} }
// put message into queue // put message into queue
SWriteMsg writeMsg; SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
writeMsg.rpcMsg = *pMsg; pWrite->rpcMsg = *pMsg;
writeMsg.pCont = pCont; pWrite->pCont = pCont;
writeMsg.contLen = pHead->contLen; pWrite->contLen = pHead->contLen;
writeMsg.pRpcContext = pRpcContext; pWrite->pRpcContext = pRpcContext;
writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later
taos_queue queue = dnodeGetVnodeWworker(pVnode); taos_queue queue = dnodeGetVnodeWworker(pVnode);
taosWriteQitem(queue, &writeMsg); taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite);
// next vnode // next vnode
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
...@@ -144,16 +142,16 @@ void dnodeWrite(SRpcMsg *pMsg) { ...@@ -144,16 +142,16 @@ void dnodeWrite(SRpcMsg *pMsg) {
} }
} }
void *dnodeAllocateWriteWorker() { void *dnodeAllocateWriteWorker(void *pVnode) {
SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId;
taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); taos_queue *queue = taosOpenQueue();
if (queue == NULL) return NULL; if (queue == NULL) return NULL;
if (pWorker->qset == NULL) { if (pWorker->qset == NULL) {
pWorker->qset = taosOpenQset(); pWorker->qset = taosOpenQset();
if (pWorker->qset == NULL) return NULL; if (pWorker->qset == NULL) return NULL;
taosAddIntoQset(pWorker->qset, queue); taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
pthread_attr_t thAttr; pthread_attr_t thAttr;
...@@ -165,7 +163,7 @@ void *dnodeAllocateWriteWorker() { ...@@ -165,7 +163,7 @@ void *dnodeAllocateWriteWorker() {
taosCloseQset(pWorker->qset); taosCloseQset(pWorker->qset);
} }
} else { } else {
taosAddIntoQset(pWorker->qset, queue); taosAddIntoQset(pWorker->qset, queue, pVnode);
wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max;
} }
...@@ -181,19 +179,23 @@ void dnodeFreeWriteWorker(void *wqueue) { ...@@ -181,19 +179,23 @@ void dnodeFreeWriteWorker(void *wqueue) {
static void *dnodeProcessWriteQueue(void *param) { static void *dnodeProcessWriteQueue(void *param) {
SWriteWorker *pWorker = (SWriteWorker *)param; SWriteWorker *pWorker = (SWriteWorker *)param;
taos_qall qall; taos_qall qall;
SWriteMsg writeMsg; SWriteMsg *pWriteMsg;
int32_t numOfMsgs; int32_t numOfMsgs;
int type;
void *pVnode;
qall = taosAllocateQall();
while (1) { while (1) {
numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall); numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode);
if (numOfMsgs <=0) { if (numOfMsgs <=0) {
dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore
continue; continue;
} }
for (int32_t i=0; i<numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
// retrieve all items, and write them into WAL // retrieve all items, and write them into WAL
taosGetQitem(qall, &writeMsg); taosGetQitem(qall, &type, (void **)&pWriteMsg);
// walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen); // walWrite(pVnode->whandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen);
} }
...@@ -204,30 +206,30 @@ static void *dnodeProcessWriteQueue(void *param) { ...@@ -204,30 +206,30 @@ static void *dnodeProcessWriteQueue(void *param) {
// browse all items, and process them one by one // browse all items, and process them one by one
taosResetQitems(qall); taosResetQitems(qall);
for (int32_t i = 0; i < numOfMsgs; ++i) { for (int32_t i = 0; i < numOfMsgs; ++i) {
taosGetQitem(qall, &writeMsg); taosGetQitem(qall, &type, (void **)&pWriteMsg);
terrno = 0; terrno = 0;
if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) { if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) {
(*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg); (*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg);
} else { } else {
terrno = TSDB_CODE_MSG_NOT_PROCESSED; terrno = TSDB_CODE_MSG_NOT_PROCESSED;
} }
dnodeProcessWriteResult(&writeMsg); dnodeProcessWriteResult(pVnode, pWriteMsg);
taosFreeQitem(pWriteMsg);
} }
// free the Qitems;
taosFreeQitems(qall);
} }
taosFreeQall(qall);
return NULL; return NULL;
} }
static void dnodeProcessWriteResult(SWriteMsg *pWrite) { static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) {
SRpcContext *pRpcContext = pWrite->pRpcContext; SRpcContext *pRpcContext = pWrite->pRpcContext;
int32_t code = 0; int32_t code = 0;
dnodeReleaseVnode(pWrite->pVnode); dnodeReleaseVnode(pVnode);
if (pRpcContext) { if (pRpcContext) {
if (terrno) { if (terrno) {
...@@ -267,7 +269,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { ...@@ -267,7 +269,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
} }
} }
static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) {
dTrace("submit msg is disposed"); dTrace("submit msg is disposed");
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg)); SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
...@@ -276,7 +278,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { ...@@ -276,7 +278,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
pRsp->affectedRows = htonl(1); pRsp->affectedRows = htonl(1);
pRsp->numOfFailedBlocks = 0; pRsp->numOfFailedBlocks = 0;
void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); void* tsdb = dnodeGetVnodeTsdb(pVnode);
assert(tsdb != NULL); assert(tsdb != NULL);
tsdbInsertData(tsdb, pMsg->pCont); tsdbInsertData(tsdb, pMsg->pCont);
...@@ -292,7 +294,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { ...@@ -292,7 +294,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) {
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
...@@ -341,16 +343,16 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { ...@@ -341,16 +343,16 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) {
tsdbTableSetTagValue(&tCfg, dataRow, false); tsdbTableSetTagValue(&tCfg, dataRow, false);
} }
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg); rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
dnodeReleaseVnode(pMsg->pVnode); dnodeReleaseVnode(pVnode);
dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont; SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
...@@ -360,16 +362,16 @@ static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { ...@@ -360,16 +362,16 @@ static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) {
.tid = htonl(pTable->sid) .tid = htonl(pTable->sid)
}; };
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbDropTable(pTsdb, tableId); rpcRsp.code = tsdbDropTable(pTsdb, tableId);
dnodeReleaseVnode(pMsg->pVnode); dnodeReleaseVnode(pVnode);
dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
...@@ -418,16 +420,16 @@ static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { ...@@ -418,16 +420,16 @@ static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) {
tsdbTableSetTagValue(&tCfg, dataRow, false); tsdbTableSetTagValue(&tCfg, dataRow, false);
} }
void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); void *pTsdb = dnodeGetVnodeTsdb(pVnode);
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg); rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
dnodeReleaseVnode(pMsg->pVnode); dnodeReleaseVnode(pVnode);
dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
} }
static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) {
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
...@@ -439,7 +441,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { ...@@ -439,7 +441,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) {
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); //rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
rpcRsp.code = TSDB_CODE_SUCCESS; rpcRsp.code = TSDB_CODE_SUCCESS;
dnodeReleaseVnode(pMsg->pVnode); dnodeReleaseVnode(pVnode);
dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code)); dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code));
rpcSendResponse(&rpcRsp); rpcSendResponse(&rpcRsp);
......
...@@ -176,7 +176,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); ...@@ -176,7 +176,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
#define TSDB_MAX_COLUMNS 256 #define TSDB_MAX_COLUMNS 256
#define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns #define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns
#define TSDB_DNODE_NAME_LEN 63 #define TSDB_DNODE_NAME_LEN 64
#define TSDB_TABLE_NAME_LEN 192 #define TSDB_TABLE_NAME_LEN 192
#define TSDB_DB_NAME_LEN 32 #define TSDB_DB_NAME_LEN 32
#define TSDB_COL_NAME_LEN 64 #define TSDB_COL_NAME_LEN 64
......
...@@ -46,6 +46,7 @@ int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); ...@@ -46,6 +46,7 @@ int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName);
int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable);
void mgmtDropAllSuperTables(SDbObj *pDropDb); void mgmtDropAllSuperTables(SDbObj *pDropDb);
int32_t mgmtExtractTableName(const char* tableId, char* name);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -524,7 +524,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v ...@@ -524,7 +524,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v
} }
memset(stableName, 0, tListLen(stableName)); memset(stableName, 0, tListLen(stableName));
extractTableName(pTable->tableId, stableName); mgmtExtractTableName(pTable->tableId, stableName);
if (pShow->payloadLen > 0 && if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH)
...@@ -624,3 +624,17 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg ...@@ -624,3 +624,17 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mgmtExtractTableName(const char* tableId, char* name) {
int pos = -1;
int num = 0;
for (pos = 0; tableId[pos] != 0; ++pos) {
if (tableId[pos] == '.') num++;
if (num == 2) break;
}
if (num == 2) {
strcpy(name, tableId + pos + 1);
}
return 0;
}
...@@ -310,7 +310,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * ...@@ -310,7 +310,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
numOfRead++; numOfRead++;
// pattern compare for meter name // pattern compare for meter name
extractTableName(tableId, tableName); mgmtExtractTableName(tableId, tableName);
if (pShow->payloadLen > 0 && if (pShow->payloadLen > 0 &&
patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) {
...@@ -333,7 +333,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * ...@@ -333,7 +333,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void *
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
if (superTableId != NULL) { if (superTableId != NULL) {
extractTableName(superTableId, pWrite); mgmtExtractTableName(superTableId, pWrite);
} }
cols++; cols++;
......
...@@ -172,7 +172,7 @@ typedef struct SQueryRuntimeEnv { ...@@ -172,7 +172,7 @@ typedef struct SQueryRuntimeEnv {
typedef struct SQInfo { typedef struct SQInfo {
void* signature; void* signature;
void* param; // pointer to the RpcReadMsg // void* param; // pointer to the RpcReadMsg
TSKEY startTime; TSKEY startTime;
TSKEY elapsedTime; TSKEY elapsedTime;
int32_t pointsInterpo; int32_t pointsInterpo;
...@@ -236,6 +236,6 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c ...@@ -236,6 +236,6 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c
* @param pQInfo * @param pQInfo
* @return * @return
*/ */
bool qNeedFurtherExec(SQInfo* pQInfo); bool qHasMoreResultsToRetrieve(SQInfo* pQInfo);
#endif // TDENGINE_QUERYEXECUTOR_H #endif // TDENGINE_QUERYEXECUTOR_H
...@@ -6102,7 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param ...@@ -6102,7 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param
// pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code);
} else { } else {
code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo);
(*pQInfo)->param = param; // (*pQInfo)->param = param;
} }
_query_over: _query_over:
...@@ -6206,7 +6206,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { ...@@ -6206,7 +6206,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) {
// todo if interpolation exists, the result may be dump to client by several rounds // todo if interpolation exists, the result may be dump to client by several rounds
} }
bool qNeedFurtherExec(SQInfo* pQInfo) { bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) {
if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
......
...@@ -32,6 +32,7 @@ ...@@ -32,6 +32,7 @@
#include "rpcServer.h" #include "rpcServer.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "trpc.h" #include "trpc.h"
#include "hash.h"
#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
#define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) #define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead)))
...@@ -258,7 +259,8 @@ void *rpcOpen(SRpcInit *pInit) { ...@@ -258,7 +259,8 @@ void *rpcOpen(SRpcInit *pInit) {
} }
if (pRpc->connType == TAOS_CONN_SERVER) { if (pRpc->connType == TAOS_CONN_SERVER) {
pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); // pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString);
pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true);
if (pRpc->hash == NULL) { if (pRpc->hash == NULL) {
tError("%s failed to init string hash", pRpc->label); tError("%s failed to init string hash", pRpc->label);
rpcClose(pRpc); rpcClose(pRpc);
...@@ -292,7 +294,8 @@ void rpcClose(void *param) { ...@@ -292,7 +294,8 @@ void rpcClose(void *param) {
} }
} }
taosCleanUpStrHash(pRpc->hash); // taosCleanUpStrHash(pRpc->hash);
taosHashCleanup(pRpc->hash);
taosTmrCleanUp(pRpc->tmrCtrl); taosTmrCleanUp(pRpc->tmrCtrl);
taosIdPoolCleanUp(pRpc->idPool); taosIdPoolCleanUp(pRpc->idPool);
rpcCloseConnCache(pRpc->pCache); rpcCloseConnCache(pRpc->pCache);
...@@ -507,8 +510,10 @@ static void rpcCloseConn(void *thandle) { ...@@ -507,8 +510,10 @@ static void rpcCloseConn(void *thandle) {
if ( pRpc->connType == TAOS_CONN_SERVER) { if ( pRpc->connType == TAOS_CONN_SERVER) {
char hashstr[40] = {0}; char hashstr[40] = {0};
sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType);
taosDeleteStrHash(pRpc->hash, hashstr); // taosDeleteStrHash(pRpc->hash, hashstr);
// taosHashRemove(pRpc->hash, hashstr, size);
rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg
pConn->pRspMsg = NULL; pConn->pRspMsg = NULL;
pConn->inType = 0; pConn->inType = 0;
...@@ -556,10 +561,11 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -556,10 +561,11 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
char hashstr[40] = {0}; char hashstr[40] = {0};
SRpcHead *pHead = (SRpcHead *)pRecv->msg; SRpcHead *pHead = (SRpcHead *)pRecv->msg;
sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType);
// check if it is already allocated // check if it is already allocated
SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); // SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr));
SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size));
if (ppConn) pConn = *ppConn; if (ppConn) pConn = *ppConn;
if (pConn) return pConn; if (pConn) return pConn;
...@@ -591,7 +597,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { ...@@ -591,7 +597,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn->localPort = (pRpc->localPort + pRpc->index); pConn->localPort = (pRpc->localPort + pRpc->index);
} }
taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); // taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn);
taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES);
tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u",
pRpc->label, pConn, sid, pConn->user, pConn->localPort); pRpc->label, pConn, sid, pConn->user, pConn->localPort);
} }
......
...@@ -28,10 +28,13 @@ void *qhandle = NULL; ...@@ -28,10 +28,13 @@ void *qhandle = NULL;
void processShellMsg() { void processShellMsg() {
static int num = 0; static int num = 0;
taos_qall qall; taos_qall qall;
SRpcMsg rpcMsg; SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
while (1) { while (1) {
int numOfMsgs = taosReadAllQitems(qhandle, &qall); int numOfMsgs = taosReadAllQitems(qhandle, qall);
if (numOfMsgs <= 0) { if (numOfMsgs <= 0) {
usleep(1000); usleep(1000);
continue; continue;
...@@ -40,10 +43,10 @@ void processShellMsg() { ...@@ -40,10 +43,10 @@ void processShellMsg() {
tTrace("%d shell msgs are received", numOfMsgs); tTrace("%d shell msgs are received", numOfMsgs);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg); taosGetQitem(qall, &type, (void **)&pRpcMsg);
if (dataFd >=0) { if (dataFd >=0) {
if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) {
tPrint("failed to write data file, reason:%s", strerror(errno)); tPrint("failed to write data file, reason:%s", strerror(errno));
} }
} }
...@@ -62,19 +65,22 @@ void processShellMsg() { ...@@ -62,19 +65,22 @@ void processShellMsg() {
taosResetQitems(qall); taosResetQitems(qall);
for (int i=0; i<numOfMsgs; ++i) { for (int i=0; i<numOfMsgs; ++i) {
taosGetQitem(qall, &rpcMsg);
rpcFreeCont(rpcMsg.pCont); taosGetQitem(qall, &type, (void **)&pRpcMsg);
rpcFreeCont(pRpcMsg->pCont);
rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.pCont = rpcMallocCont(msgSize);
rpcMsg.contLen = msgSize; rpcMsg.contLen = msgSize;
rpcMsg.handle = rpcMsg.handle; rpcMsg.handle = pRpcMsg->handle;
rpcMsg.code = 1; rpcMsg.code = 1;
rpcSendResponse(&rpcMsg); rpcSendResponse(&rpcMsg);
taosFreeQitem(pRpcMsg);
} }
taosFreeQitems(qall);
} }
taosFreeQall(qall);
/* /*
SRpcIpSet ipSet; SRpcIpSet ipSet;
ipSet.numOfIps = 1; ipSet.numOfIps = 1;
...@@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char ...@@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char
} }
void processRequestMsg(SRpcMsg *pMsg) { void processRequestMsg(SRpcMsg *pMsg) {
tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); SRpcMsg *pTemp;
taosWriteQitem(qhandle, pMsg);
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp);
} }
int main(int argc, char *argv[]) { int main(int argc, char *argv[]) {
...@@ -143,6 +154,7 @@ int main(int argc, char *argv[]) { ...@@ -143,6 +154,7 @@ int main(int argc, char *argv[]) {
commit = atoi(argv[++i]); commit = atoi(argv[++i]);
} else if (strcmp(argv[i], "-d")==0 && i < argc-1) { } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
rpcDebugFlag = atoi(argv[++i]); rpcDebugFlag = atoi(argv[++i]);
ddebugFlag = rpcDebugFlag;
uDebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag;
} else { } else {
printf("\nusage: %s [options] \n", argv[0]); printf("\nusage: %s [options] \n", argv[0]);
......
...@@ -78,7 +78,7 @@ void taosResetLogFile(); ...@@ -78,7 +78,7 @@ void taosResetLogFile();
// utility log function // utility log function
#define pError(...) \ #define pError(...) \
if (uDebugFlag & DEBUG_ERROR) { \ if (uDebugFlag & DEBUG_ERROR) { \
tprintf("ERROR UTL ", 255, __VA_ARGS__); \ tprintf("ERROR UTL ", uDebugFlag, __VA_ARGS__); \
} }
#define pWarn(...) \ #define pWarn(...) \
if (uDebugFlag & DEBUG_WARN) { \ if (uDebugFlag & DEBUG_WARN) { \
......
...@@ -20,28 +20,35 @@ ...@@ -20,28 +20,35 @@
extern "C" { extern "C" {
#endif #endif
#define TAOS_QTYPE_RPC 0
#define TAOS_QTYPE_FWD 1
#define TAOS_QTYPE_WAL 2
typedef void* taos_queue; typedef void* taos_queue;
typedef void* taos_qset; typedef void* taos_qset;
typedef void* taos_qall; typedef void* taos_qall;
taos_queue taosOpenQueue(int itemSize); taos_queue taosOpenQueue();
void taosCloseQueue(taos_queue); void taosCloseQueue(taos_queue);
int taosWriteQitem(taos_queue, void *item); void *taosAllocateQitem(int size);
int taosReadQitem(taos_queue, void *item); void taosFreeQitem(void *item);
int taosWriteQitem(taos_queue, int type, void *item);
int taosReadAllQitems(taos_queue, taos_qall *); int taosReadQitem(taos_queue, int *type, void **pitem);
int taosGetQitem(taos_qall, void *item);
taos_qall taosAllocateQall();
void taosFreeQall(taos_qall);
int taosReadAllQitems(taos_queue, taos_qall);
int taosGetQitem(taos_qall, int *type, void **pitem);
void taosResetQitems(taos_qall); void taosResetQitems(taos_qall);
void taosFreeQitems(taos_qall);
taos_qset taosOpenQset(); taos_qset taosOpenQset();
void taosCloseQset(); void taosCloseQset();
int taosAddIntoQset(taos_qset, taos_queue); int taosAddIntoQset(taos_qset, taos_queue, void *ahandle);
void taosRemoveFromQset(taos_qset, taos_queue); void taosRemoveFromQset(taos_qset, taos_queue);
int taosGetQueueNumber(taos_qset); int taosGetQueueNumber(taos_qset);
int taosReadQitemFromQset(taos_qset, void *item); int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle);
int taosReadAllQitemsFromQset(taos_qset, taos_qall *); int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle);
int taosGetQueueItemsNumber(taos_queue param); int taosGetQueueItemsNumber(taos_queue param);
int taosGetQsetItemsNumber(taos_qset param); int taosGetQsetItemsNumber(taos_qset param);
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#include "tqueue.h" #include "tqueue.h"
typedef struct _taos_qnode { typedef struct _taos_qnode {
int type;
struct _taos_qnode *next; struct _taos_qnode *next;
char item[]; char item[];
} STaosQnode; } STaosQnode;
...@@ -30,6 +31,7 @@ typedef struct _taos_q { ...@@ -30,6 +31,7 @@ typedef struct _taos_q {
struct _taos_qnode *tail; struct _taos_qnode *tail;
struct _taos_q *next; // for queue set struct _taos_q *next; // for queue set
struct _taos_qset *qset; // for queue set struct _taos_qset *qset; // for queue set
void *ahandle; // for queue set
pthread_mutex_t mutex; pthread_mutex_t mutex;
} STaosQueue; } STaosQueue;
...@@ -48,7 +50,7 @@ typedef struct _taos_qall { ...@@ -48,7 +50,7 @@ typedef struct _taos_qall {
int32_t numOfItems; int32_t numOfItems;
} STaosQall; } STaosQall;
taos_queue taosOpenQueue(int itemSize) { taos_queue taosOpenQueue() {
STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1); STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1);
if (queue == NULL) { if (queue == NULL) {
...@@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) { ...@@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) {
} }
pthread_mutex_init(&queue->mutex, NULL); pthread_mutex_init(&queue->mutex, NULL);
queue->itemSize = (int32_t)itemSize;
return queue; return queue;
} }
...@@ -83,16 +83,26 @@ void taosCloseQueue(taos_queue param) { ...@@ -83,16 +83,26 @@ void taosCloseQueue(taos_queue param) {
free(queue); free(queue);
} }
int taosWriteQitem(taos_queue param, void *item) { void *taosAllocateQitem(int size) {
STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1);
if (pNode == NULL) return NULL;
return (void *)pNode->item;
}
STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1); void taosFreeQitem(void *param) {
if ( pNode == NULL ) { if (param == NULL) return;
terrno = TSDB_CODE_NO_RESOURCE;
return -1; //pTrace("item:%p is freed", param);
}
memcpy(pNode->item, item, queue->itemSize); char *temp = (char *)param;
temp -= sizeof(STaosQnode);
free(temp);
}
int taosWriteQitem(taos_queue param, int type, void *item) {
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode));
pNode->type = type;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
...@@ -107,12 +117,14 @@ int taosWriteQitem(taos_queue param, void *item) { ...@@ -107,12 +117,14 @@ int taosWriteQitem(taos_queue param, void *item) {
queue->numOfItems++; queue->numOfItems++;
if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1);
//pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems);
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
return 0; return 0;
} }
int taosReadQitem(taos_queue param, void *item) { int taosReadQitem(taos_queue param, int *type, void **pitem) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int code = 0;
...@@ -121,14 +133,15 @@ int taosReadQitem(taos_queue param, void *item) { ...@@ -121,14 +133,15 @@ int taosReadQitem(taos_queue param, void *item) {
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize); *pitem = pNode->item;
*type = pNode->type;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) if (queue->head == NULL)
queue->tail = NULL; queue->tail = NULL;
free(pNode);
queue->numOfItems--; queue->numOfItems--;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1);
code = 1; code = 1;
//pTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems);
} }
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
...@@ -136,19 +149,24 @@ int taosReadQitem(taos_queue param, void *item) { ...@@ -136,19 +149,24 @@ int taosReadQitem(taos_queue param, void *item) {
return code; return code;
} }
int taosReadAllQitems(taos_queue param, taos_qall *res) { void *taosAllocateQall() {
void *p = malloc(sizeof(STaosQall));
return p;
}
void taosFreeQall(void *param) {
free(param);
}
int taosReadAllQitems(taos_queue param, taos_qall p2) {
STaosQueue *queue = (STaosQueue *)param; STaosQueue *queue = (STaosQueue *)param;
STaosQall *qall = NULL; STaosQall *qall = (STaosQall *)p2;
int code = 0; int code = 0;
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1); memset(qall, 0, sizeof(STaosQall));
if ( qall == NULL ) {
terrno = TSDB_CODE_NO_RESOURCE;
code = -1;
} else {
qall->current = queue->head; qall->current = queue->head;
qall->start = queue->head; qall->start = queue->head;
qall->numOfItems = queue->numOfItems; qall->numOfItems = queue->numOfItems;
...@@ -160,15 +178,13 @@ int taosReadAllQitems(taos_queue param, taos_qall *res) { ...@@ -160,15 +178,13 @@ int taosReadAllQitems(taos_queue param, taos_qall *res) {
queue->numOfItems = 0; queue->numOfItems = 0;
if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems);
} }
}
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
*res = qall;
return code; return code;
} }
int taosGetQitem(taos_qall param, void *item) { int taosGetQitem(taos_qall param, int *type, void **pitem) {
STaosQall *qall = (STaosQall *)param; STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode; STaosQnode *pNode;
int num = 0; int num = 0;
...@@ -178,8 +194,10 @@ int taosGetQitem(taos_qall param, void *item) { ...@@ -178,8 +194,10 @@ int taosGetQitem(taos_qall param, void *item) {
qall->current = pNode->next; qall->current = pNode->next;
if (pNode) { if (pNode) {
memcpy(item, pNode->item, qall->itemSize); *pitem = pNode->item;
*type = pNode->type;
num = 1; num = 1;
//pTrace("item:%p is fetched", *pitem);
} }
return num; return num;
...@@ -190,19 +208,6 @@ void taosResetQitems(taos_qall param) { ...@@ -190,19 +208,6 @@ void taosResetQitems(taos_qall param) {
qall->current = qall->start; qall->current = qall->start;
} }
void taosFreeQitems(taos_qall param) {
STaosQall *qall = (STaosQall *)param;
STaosQnode *pNode;
while (qall->current) {
pNode = qall->current;
qall->current = pNode->next;
free(pNode);
}
free(qall);
}
taos_qset taosOpenQset() { taos_qset taosOpenQset() {
STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1); STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1);
...@@ -221,7 +226,7 @@ void taosCloseQset(taos_qset param) { ...@@ -221,7 +226,7 @@ void taosCloseQset(taos_qset param) {
free(qset); free(qset);
} }
int taosAddIntoQset(taos_qset p1, taos_queue p2) { int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) {
STaosQueue *queue = (STaosQueue *)p2; STaosQueue *queue = (STaosQueue *)p2;
STaosQset *qset = (STaosQset *)p1; STaosQset *qset = (STaosQset *)p1;
...@@ -230,6 +235,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) { ...@@ -230,6 +235,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) {
pthread_mutex_lock(&qset->mutex); pthread_mutex_lock(&qset->mutex);
queue->next = qset->head; queue->next = qset->head;
queue->ahandle = ahandle;
qset->head = queue; qset->head = queue;
qset->numOfQueues++; qset->numOfQueues++;
...@@ -283,7 +289,7 @@ int taosGetQueueNumber(taos_qset param) { ...@@ -283,7 +289,7 @@ int taosGetQueueNumber(taos_qset param) {
return ((STaosQset *)param)->numOfQueues; return ((STaosQset *)param)->numOfQueues;
} }
int taosReadQitemFromQset(taos_qset param, void *item) { int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
STaosQnode *pNode = NULL; STaosQnode *pNode = NULL;
int code = 0; int code = 0;
...@@ -301,11 +307,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) { ...@@ -301,11 +307,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
if (queue->head) { if (queue->head) {
pNode = queue->head; pNode = queue->head;
memcpy(item, pNode->item, queue->itemSize); *pitem = pNode->item;
*type = pNode->type;
*phandle = queue->ahandle;
queue->head = pNode->next; queue->head = pNode->next;
if (queue->head == NULL) if (queue->head == NULL)
queue->tail = NULL; queue->tail = NULL;
free(pNode);
queue->numOfItems--; queue->numOfItems--;
atomic_sub_fetch_32(&qset->numOfItems, 1); atomic_sub_fetch_32(&qset->numOfItems, 1);
code = 1; code = 1;
...@@ -318,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, void *item) { ...@@ -318,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, void *item) {
return code; return code;
} }
int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) {
STaosQset *qset = (STaosQset *)param; STaosQset *qset = (STaosQset *)param;
STaosQueue *queue; STaosQueue *queue;
STaosQall *qall = NULL; STaosQall *qall = (STaosQall *)p2;
int code = 0; int code = 0;
for(int i=0; i<qset->numOfQueues; ++i) { for(int i=0; i<qset->numOfQueues; ++i) {
...@@ -336,31 +343,24 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { ...@@ -336,31 +343,24 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) {
pthread_mutex_lock(&queue->mutex); pthread_mutex_lock(&queue->mutex);
if (queue->head) { if (queue->head) {
qall = (STaosQall *) calloc(sizeof(STaosQall), 1);
if (qall == NULL) {
terrno = TSDB_CODE_NO_RESOURCE;
code = -1;
} else {
qall->current = queue->head; qall->current = queue->head;
qall->start = queue->head; qall->start = queue->head;
qall->numOfItems = queue->numOfItems; qall->numOfItems = queue->numOfItems;
qall->itemSize = queue->itemSize; qall->itemSize = queue->itemSize;
code = qall->numOfItems; code = qall->numOfItems;
*phandle = queue->ahandle;
queue->head = NULL; queue->head = NULL;
queue->tail = NULL; queue->tail = NULL;
queue->numOfItems = 0; queue->numOfItems = 0;
atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems);
} }
}
pthread_mutex_unlock(&queue->mutex); pthread_mutex_unlock(&queue->mutex);
if (code != 0) break; if (code != 0) break;
} }
*res = qall;
return code; return code;
} }
......
...@@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ...@@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
struct sockaddr_in serverAddr, clientAddr; struct sockaddr_in serverAddr, clientAddr;
int ret; int ret;
pTrace("open tcp client socket:%s:%d", destIp, destPort); // pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp);
sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
...@@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ...@@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) {
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret != 0) { if (ret != 0) {
pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); //pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd); taosCloseSocket(sockFd);
sockFd = -1; sockFd = -1;
} }
......
...@@ -34,20 +34,22 @@ typedef enum { ...@@ -34,20 +34,22 @@ typedef enum {
TSDB_FILE_TYPE_MAX TSDB_FILE_TYPE_MAX
} TSDB_FILE_TYPE; } TSDB_FILE_TYPE;
extern const char *tsdbFileSuffix[]; #define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
typedef struct { extern const char *tsdbFileSuffix[];
int64_t size;
int64_t tombSize;
} SFileInfo;
typedef struct { typedef struct {
int8_t type; int8_t type;
int fd;
char fname[128]; char fname[128];
int64_t size; // total size of the file int64_t size; // total size of the file
int64_t tombSize; // unused file size int64_t tombSize; // unused file size
int32_t totalBlocks;
int32_t totalSubBlocks;
} SFile; } SFile;
#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1)
typedef struct { typedef struct {
int32_t fileId; int32_t fileId;
SFile files[TSDB_FILE_TYPE_MAX]; SFile files[TSDB_FILE_TYPE_MAX];
...@@ -55,14 +57,26 @@ typedef struct { ...@@ -55,14 +57,26 @@ typedef struct {
// TSDB file handle // TSDB file handle
typedef struct { typedef struct {
int32_t daysPerFile; int maxFGroups;
int32_t keep; int numOfFGroups;
int32_t minRowPerFBlock;
int32_t maxRowsPerFBlock;
int32_t maxTables;
SFileGroup fGroup[]; SFileGroup fGroup[];
} STsdbFileH; } STsdbFileH;
#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId
#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
typedef struct {
int32_t len;
int32_t padding; // For padding purpose
int64_t offset;
} SCompIdx;
/** /**
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block * if numOfSubBlocks == -1, then the SCompBlock is a sub-block
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
...@@ -83,13 +97,31 @@ typedef struct { ...@@ -83,13 +97,31 @@ typedef struct {
TSKEY keyLast; TSKEY keyLast;
} SCompBlock; } SCompBlock;
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) typedef struct {
int32_t delimiter; // For recovery usage
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
int64_t uid;
int32_t padding; // For padding purpose
int32_t numOfBlocks; // TODO: make the struct padding
SCompBlock blocks[];
} SCompInfo;
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, // TODO: take pre-calculation into account
int32_t maxRowsPerFBlock, int32_t maxTables); typedef struct {
int16_t colId; // Column ID
int16_t len; // Column length
int32_t type : 8;
int32_t offset : 24;
} SCompCol;
// TODO: Take recover into account
typedef struct {
int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
int64_t uid; // For recovery usage
SCompCol cols[];
} SCompData;
void tsdbCloseFile(STsdbFileH *pFileH);
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables);
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
#ifdef __cplusplus #ifdef __cplusplus
} }
......
...@@ -27,72 +27,126 @@ ...@@ -27,72 +27,126 @@
#define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_HEAD_SIZE 512
#define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_DELIMITER 0xF00AFA0F
typedef struct {
int32_t len;
int32_t padding; // For padding purpose
int64_t offset;
} SCompIdx;
typedef struct {
int32_t delimiter; // For recovery usage
int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
int64_t uid;
int32_t padding; // For padding purpose
int32_t numOfBlocks; // TODO: make the struct padding
SCompBlock blocks[];
} SCompInfo;
// TODO: take pre-calculation into account
typedef struct {
int16_t colId; // Column ID
int16_t len; // Column length
int32_t type : 8;
int32_t offset : 24;
} SCompCol;
// TODO: Take recover into account
typedef struct {
int32_t delimiter; // For recovery usage
int32_t numOfCols; // For recovery usage
int64_t uid; // For recovery usage
SCompCol cols[];
} SCompData;
const char *tsdbFileSuffix[] = { const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD ".head", // TSDB_FILE_TYPE_HEAD
".data", // TSDB_FILE_TYPE_DATA ".data", // TSDB_FILE_TYPE_DATA
".last" // TSDB_FILE_TYPE_LAST ".last" // TSDB_FILE_TYPE_LAST
}; };
static int tsdbWriteFileHead(int fd, SFile *pFile) { static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2);
static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname);
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile);
static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) {
STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles);
if (pFileH == NULL) { // TODO: deal with ERROR here
return NULL;
}
pFileH->maxFGroups = maxFiles;
DIR *dir = opendir(dataDir);
if (dir == NULL) {
free(pFileH);
return NULL;
}
struct dirent *dp;
while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue;
// TODO
}
return pFileH;
}
void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); }
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1;
SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup;
if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) ||
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) ==
NULL) {
pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) {
// TODO: deal with the ERROR here, remove those creaed file
return -1;
}
}
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
}
return 0;
}
int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
SFileGroup *pGroup =
bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey);
if (pGroup == NULL) return -1;
// Remove from disk
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
remove(pGroup->files[type].fname);
}
// Adjust the memory
int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1);
if (filesBehind > 0) {
memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind);
}
pFileH->numOfFGroups--;
return 0;
}
static int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key;
SFileGroup *pFGroup = (SFileGroup *)fgroup;
return (fid - pFGroup->fileId);
}
static int compFGroup(const void *arg1, const void *arg2) {
return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId;
}
static int tsdbWriteFileHead(SFile *pFile) {
char head[TSDB_FILE_HEAD_SIZE] = "\0"; char head[TSDB_FILE_HEAD_SIZE] = "\0";
pFile->size += TSDB_FILE_HEAD_SIZE; pFile->size += TSDB_FILE_HEAD_SIZE;
// TODO: write version and File statistic to the head // TODO: write version and File statistic to the head
lseek(fd, 0, SEEK_SET); lseek(pFile->fd, 0, SEEK_SET);
if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1;
return 0; return 0;
} }
static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) { static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
int size = sizeof(SCompIdx) * maxTables; int size = sizeof(SCompIdx) * maxTables;
void *buf = calloc(1, size); void *buf = calloc(1, size);
if (buf == NULL) return -1; if (buf == NULL) return -1;
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
free(buf); free(buf);
return -1; return -1;
} }
if (write(fd, buf, size) < 0) { if (write(pFile->fd, buf, size) < 0) {
free(buf); free(buf);
return -1; return -1;
} }
pFile->size += size; pFile->size += size;
free(buf);
return 0; return 0;
} }
...@@ -104,12 +158,27 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) ...@@ -104,12 +158,27 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname)
return 0; return 0;
} }
/** static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function
* Create a file and set the SFile object if (TSDB_IS_FILE_OPENED(pFile)) return -1;
*/
pFile->fd = open(pFile->fname, oflag, 0755);
if (pFile->fd < 0) return -1;
return 0;
}
static int tsdbCloseFile(SFile *pFile) {
if (!TSDB_IS_FILE_OPENED(pFile)) return -1;
int ret = close(pFile->fd);
pFile->fd = -1;
return ret;
}
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) {
memset((void *)pFile, 0, sizeof(SFile)); memset((void *)pFile, 0, sizeof(SFile));
pFile->type = type; pFile->type = type;
pFile->fd = -1;
tsdbGetFileName(dataDir, fileId, type, pFile->fname); tsdbGetFileName(dataDir, fileId, type, pFile->fname);
if (access(pFile->fname, F_OK) == 0) { if (access(pFile->fname, F_OK) == 0) {
...@@ -117,93 +186,28 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, ...@@ -117,93 +186,28 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables,
return -1; return -1;
} }
int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755); if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) {
if (fd < 0) return -1; // TODO: deal with the ERROR here
if (type == TSDB_FILE_TYPE_HEAD) {
if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) {
close(fd);
return -1; return -1;
} }
}
if (tsdbWriteFileHead(fd, pFile) < 0) { if (type == TSDB_FILE_TYPE_HEAD) {
close(fd); if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) {
tsdbCloseFile(pFile);
return -1; return -1;
} }
}
close(fd); if (tsdbWriteFileHead(pFile) < 0) {
tsdbCloseFile(pFile);
return 0;
}
static int tsdbRemoveFile(SFile *pFile) {
if (pFile == NULL) return -1;
return remove(pFile->fname);
}
// Create a file group with fileId and return a SFileGroup object
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) {
if (dataDir == NULL || pFGroup == NULL) return -1;
memset((void *)pFGroup, 0, sizeof(SFileGroup));
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) {
// TODO: deal with the error here, remove the created files
return -1; return -1;
} }
}
pFGroup->fileId = fileId; tsdbCloseFile(pFile);
return 0; return 0;
} }
/**
* Initialize the TSDB file handle
*/
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
int32_t maxRowsPerFBlock, int32_t maxTables) {
STsdbFileH *pTsdbFileH =
(STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile));
if (pTsdbFileH == NULL) return NULL;
pTsdbFileH->daysPerFile = daysPerFile;
pTsdbFileH->keep = keep;
pTsdbFileH->minRowPerFBlock = minRowsPerFBlock;
pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock;
pTsdbFileH->maxTables = maxTables;
// Open the directory to read information of each file
DIR *dir = opendir(dataDir);
if (dir == NULL) {
free(pTsdbFileH);
return NULL;
}
char fname[256];
struct dirent *dp;
while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
if (true /* check if the file is the .head file */) {
int fileId = 0;
int vgId = 0;
sscanf(dp->d_name, "v%df%d.head", &vgId, &fileId);
// TODO
// Open head file
// Open data file
// Open last file
}
}
return pTsdbFileH;
}
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
TSKEY *maxKey) { TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; *minKey = fileId * daysPerFile * tsMsPerDay[precision];
......
...@@ -182,7 +182,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO ...@@ -182,7 +182,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
char dataDir[128] = "\0"; char dataDir[128] = "\0";
tsdbGetDataDirName(pRepo, dataDir); tsdbGetDataDirName(pRepo, dataDir);
pRepo->tsdbFileH = pRepo->tsdbFileH =
tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables); tsdbInitFileH(dataDir, pCfg->maxTables);
if (pRepo->tsdbFileH == NULL) { if (pRepo->tsdbFileH == NULL) {
free(pRepo->rootDir); free(pRepo->rootDir);
tsdbFreeCache(pRepo->tsdbCache); tsdbFreeCache(pRepo->tsdbCache);
...@@ -782,19 +782,51 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max ...@@ -782,19 +782,51 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
return numOfRows; return numOfRows;
} }
static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) {
if (iters == NULL) return;
for (int tid = 0; tid < maxTables; tid++) {
if (iters[tid] == NULL) continue;
tSkipListDestroyIter(iters[tid]);
}
free(iters);
}
static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) {
SSkipListIterator **iters = (SSkipListIterator *)calloc(maxTables, sizeof(SSkipListIterator *));
if (iters == NULL) return NULL;
for (int tid = 0; tid < maxTables; tid++) {
STable *pTable = pMeta->tables[tid];
if (pTable == NULL || pTable->imem == NULL) continue;
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
if (iters[tid] == NULL) {
tsdbDestroyTableIters(iters, maxTables);
return NULL;
}
if (!tSkipListIterNext(iters[tid])) {
assert(false);
}
}
return iters;
}
// Commit to file // Commit to file
static void *tsdbCommitToFile(void *arg) { static void *tsdbCommitToFile(void *arg) {
// TODO // TODO
printf("Starting to commit....\n");
STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCache *pCache = pRepo->tsdbCache; STsdbCache *pCache = pRepo->tsdbCache;
STsdbCfg * pCfg = &(pRepo->config); STsdbCfg * pCfg = &(pRepo->config);
if (pCache->imem == NULL) return; if (pCache->imem == NULL) return;
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); // Create the iterator to read from cache
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables);
SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *));
if (iters == NULL) { if (iters == NULL) {
// TODO: deal with the error // TODO: deal with the error
return NULL; return NULL;
...@@ -805,10 +837,15 @@ static void *tsdbCommitToFile(void *arg) { ...@@ -805,10 +837,15 @@ static void *tsdbCommitToFile(void *arg) {
SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols);
void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock);
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
for (int fid = sfid; fid <= efid; fid++) { for (int fid = sfid; fid <= efid; fid++) {
TSKEY minKey = 0, maxKey = 0; TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
// tsdbOpenFileForWrite(pRepo, fid);
for (int tid = 0; tid < pCfg->maxTables; tid++) { for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable *pTable = pMeta->tables[tid]; STable *pTable = pMeta->tables[tid];
if (pTable == NULL || pTable->imem == NULL) continue; if (pTable == NULL || pTable->imem == NULL) continue;
...@@ -837,14 +874,10 @@ static void *tsdbCommitToFile(void *arg) { ...@@ -837,14 +874,10 @@ static void *tsdbCommitToFile(void *arg) {
} }
} }
// Free the iterator tsdbDestroyTableIters(iters, pCfg->maxTables);
for (int tid = 0; tid < pCfg->maxTables; tid++) {
if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]);
}
free(buf); free(buf);
free(cols); free(cols);
free(iters);
tsdbLockRepo(arg); tsdbLockRepo(arg);
tdListMove(pCache->imem->list, pCache->pool.memPool); tdListMove(pCache->imem->list, pCache->pool.memPool);
......
...@@ -142,7 +142,7 @@ TEST(TsdbTest, DISABLED_openRepo) { ...@@ -142,7 +142,7 @@ TEST(TsdbTest, DISABLED_openRepo) {
TEST(TsdbTest, DISABLED_createFileGroup) { TEST(TsdbTest, DISABLED_createFileGroup) {
SFileGroup fGroup; SFileGroup fGroup;
ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0); // ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);
int k = 0; int k = 0;
} }
\ No newline at end of file
...@@ -64,6 +64,9 @@ int main(int argc, char *argv[]) { ...@@ -64,6 +64,9 @@ int main(int argc, char *argv[]) {
memset(buf, 0, 512); memset(buf, 0, 512);
} }
taos_close(taos);
getchar();
return 0; return 0;
taos_query(taos, "drop database demo"); taos_query(taos, "drop database demo");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册