diff --git a/src/dnode/src/dnodeMgmt.c b/src/dnode/src/dnodeMgmt.c index cf1de5eab9574cfbebb5810349c479afe697c079..4d335d73534b452b71753b63d6d1b814b0df674b 100644 --- a/src/dnode/src/dnodeMgmt.c +++ b/src/dnode/src/dnodeMgmt.c @@ -225,17 +225,17 @@ static int32_t dnodeOpenVnode(int32_t vnode, char *rootDir) { vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; - vnodeObj.wworker = dnodeAllocateWriteWorker(); - vnodeObj.rworker = dnodeAllocateReadWorker(); vnodeObj.wal = NULL; vnodeObj.tsdb = pTsdb; vnodeObj.replica = NULL; vnodeObj.events = NULL; vnodeObj.cq = NULL; - taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); + SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); + pVnode->wworker = dnodeAllocateWriteWorker(pVnode); + pVnode->rworker = dnodeAllocateReadWorker(pVnode); - dTrace("open vnode:%d in %s", vnodeObj.vgId, rootDir); + dTrace("open vnode:%d in %s", pVnode->vgId, rootDir); return TSDB_CODE_SUCCESS; } @@ -314,17 +314,17 @@ static int32_t dnodeCreateVnode(SMDCreateVnodeMsg *pVnodeCfg) { vnodeObj.status = TSDB_VN_STATUS_NOT_READY; vnodeObj.refCount = 1; vnodeObj.version = 0; - vnodeObj.wworker = dnodeAllocateWriteWorker(); - vnodeObj.rworker = dnodeAllocateReadWorker(); vnodeObj.wal = NULL; vnodeObj.tsdb = pTsdb; vnodeObj.replica = NULL; vnodeObj.events = NULL; vnodeObj.cq = NULL; - taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *) (&vnodeObj)); + SVnodeObj *pVnode = (SVnodeObj *)taosAddIntHash(tsDnodeVnodesHash, vnodeObj.vgId, (char *)(&vnodeObj)); + pVnode->wworker = dnodeAllocateWriteWorker(pVnode); + pVnode->rworker = dnodeAllocateReadWorker(pVnode); - dPrint("vgroup:%d, vnode:%d is created", vnodeObj.vgId, vnodeObj.vgId); + dPrint("vgroup:%d, vnode:%d is created", pVnode->vgId, pVnode->vgId); return TSDB_CODE_SUCCESS; } diff --git a/src/dnode/src/dnodeRead.c b/src/dnode/src/dnodeRead.c index 0ae093728ff1274e6f103755efc6d26cf87849e5..a23336630e5ef6314db72c4173029e2853566095 100644 --- a/src/dnode/src/dnodeRead.c +++ b/src/dnode/src/dnodeRead.c @@ -36,16 +36,15 @@ typedef struct { void *pCont; int32_t contLen; SRpcMsg rpcMsg; - void *pVnode; SRpcContext *pRpcContext; // RPC message context } SReadMsg; static void *dnodeProcessReadQueue(void *param); -static void dnodeProcessReadResult(SReadMsg *pRead); +static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead); static void dnodeHandleIdleReadWorker(); -static void dnodeProcessQueryMsg(SReadMsg *pMsg); -static void dnodeProcessRetrieveMsg(SReadMsg *pMsg); -static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SReadMsg *pNode); +static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg); +static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg); +static void(*dnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(void *pVnode, SReadMsg *pNode); // module global variable static taos_qset readQset; @@ -104,23 +103,19 @@ void dnodeRead(SRpcMsg *pMsg) { } // put message into queue - SReadMsg readMsg = { - .rpcMsg = *pMsg, - .pCont = pCont, - .contLen = pHead->contLen, - .pRpcContext = pRpcContext, - .pVnode = pVnode, - }; + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg = *pMsg; + pRead->pCont = pCont; + pRead->contLen = pHead->contLen; + pRead->pRpcContext = pRpcContext; taos_queue queue = dnodeGetVnodeRworker(pVnode); - taosWriteQitem(queue, &readMsg); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); // next vnode leftLen -= pHead->contLen; pCont -= pHead->contLen; queuedMsgNum++; - - dnodeReleaseVnode(pVnode); } if (queuedMsgNum == 0) { @@ -135,11 +130,11 @@ void dnodeRead(SRpcMsg *pMsg) { } } -void *dnodeAllocateReadWorker() { +void *dnodeAllocateReadWorker(void *pVnode) { taos_queue *queue = taosOpenQueue(sizeof(SReadMsg)); if (queue == NULL) return NULL; - taosAddIntoQset(readQset, queue); + taosAddIntoQset(readQset, queue, pVnode); // spawn a thread to process queue if (threads < maxThreads) { @@ -164,20 +159,27 @@ void dnodeFreeReadWorker(void *rqueue) { static void *dnodeProcessReadQueue(void *param) { taos_qset qset = (taos_qset)param; - SReadMsg readMsg; + SReadMsg *pReadMsg; + int type; + void *pVnode; while (1) { - if (taosReadQitemFromQset(qset, &readMsg) <= 0) { + if (taosReadQitemFromQset(qset, &type, (void **)&pReadMsg, (void **)&pVnode) == 0) { dnodeHandleIdleReadWorker(); continue; } terrno = 0; - if (dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) { - (*dnodeProcessReadMsgFp[readMsg.rpcMsg.msgType]) (&readMsg); + if (dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) { + (*dnodeProcessReadMsgFp[pReadMsg->rpcMsg.msgType]) (pVnode, pReadMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } + +// dnodeProcessReadResult(pVnode, pReadMsg); + taosFreeQitem(pReadMsg); + + dnodeReleaseVnode(pVnode); } return NULL; @@ -195,11 +197,11 @@ static void dnodeHandleIdleReadWorker() { } } -static void dnodeProcessReadResult(SReadMsg *pRead) { +static void dnodeProcessReadResult(void *pVnode, SReadMsg *pRead) { SRpcContext *pRpcContext = pRead->pRpcContext; int32_t code = 0; - dnodeReleaseVnode(pRead->pVnode); + dnodeReleaseVnode(pVnode); if (pRpcContext) { if (terrno) { @@ -218,34 +220,48 @@ static void dnodeProcessReadResult(SReadMsg *pRead) { code = terrno; } - SRpcMsg rsp; - rsp.handle = pRead->rpcMsg.handle; - rsp.code = code; - rsp.pCont = NULL; - rpcSendResponse(&rsp); + //TODO: query handle is returned by dnodeProcessQueryMsg + if (0) { + SRpcMsg rsp; + rsp.handle = pRead->rpcMsg.handle; + rsp.code = code; + rsp.pCont = NULL; + rpcSendResponse(&rsp); + } + rpcFreeCont(pRead->rpcMsg.pCont); // free the received message } -static void dnodeContinueExecuteQuery(void* qhandle, SReadMsg *pMsg) { - SReadMsg readMsg = { - .rpcMsg = {.msgType = TSDB_MSG_TYPE_QUERY}, - .pCont = qhandle, - .contLen = 0, - .pRpcContext = pMsg->pRpcContext, - .pVnode = pMsg->pVnode, - }; +static void dnodeContinueExecuteQuery(void* pVnode, void* qhandle, SReadMsg *pMsg) { + + SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); + pRead->rpcMsg = pMsg->rpcMsg; + pRead->pCont = qhandle; + pRead->contLen = 0; + pRead->pRpcContext = pMsg->pRpcContext; + pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY; - taos_queue queue = dnodeGetVnodeRworker(pMsg->pVnode); - taosWriteQitem(queue, &readMsg); + taos_queue queue = dnodeGetVnodeRworker(pVnode); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pRead); + +// SReadMsg readMsg = { +// .rpcMsg = {0}, +// .pCont = qhandle, +// .contLen = 0, +// .pRpcContext = pMsg->pRpcContext, +// }; +// +// taos_queue queue = dnodeGetVnodeRworker(pVnode); +// taosWriteQitem(queue, TSDB_MSG_TYPE_QUERY, &readMsg); } -static void dnodeProcessQueryMsg(SReadMsg *pMsg) { +static void dnodeProcessQueryMsg(void *pVnode, SReadMsg *pMsg) { SQueryTableMsg* pQueryTableMsg = (SQueryTableMsg*) pMsg->pCont; SQInfo* pQInfo = NULL; - if (pMsg->rpcMsg.contLen != 0) { - void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); - int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, pMsg, &pQInfo); + if (pMsg->contLen != 0) { + void* tsdb = dnodeGetVnodeTsdb(pVnode); + int32_t code = qCreateQueryInfo(tsdb, pQueryTableMsg, NULL, &pQInfo); SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp)); pRsp->code = code; @@ -264,16 +280,14 @@ static void dnodeProcessQueryMsg(SReadMsg *pMsg) { pQInfo = pMsg->pCont; } - // do execute query - qTableQuery(pQInfo); + qTableQuery(pQInfo); // do execute query } -static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { +static void dnodeProcessRetrieveMsg(void *pVnode, SReadMsg *pMsg) { SRetrieveTableMsg *pRetrieve = pMsg->pCont; void *pQInfo = (void*) htobe64(pRetrieve->qhandle); dTrace("QInfo:%p vgId:%d, retrieve msg is received", pQInfo, pRetrieve->header.vgId); - int32_t contLen = 0; SRetrieveTableRsp *pRsp = NULL; @@ -288,8 +302,10 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { // todo check code and handle error in build result set code = qDumpRetrieveResult(pQInfo, &pRsp, &contLen); - if (qNeedFurtherExec(pQInfo)) { - dnodeContinueExecuteQuery(pQInfo, pMsg); + if (qHasMoreResultsToRetrieve(pQInfo)) { + dnodeContinueExecuteQuery(pVnode, pQInfo, pMsg); + } else { // no further execution invoked, release the ref to vnode + dnodeProcessReadResult(pVnode, pMsg); } } @@ -302,7 +318,4 @@ static void dnodeProcessRetrieveMsg(SReadMsg *pMsg) { }; rpcSendResponse(&rpcRsp); - - //todo merge result should be done here - //dnodeProcessReadResult(&readMsg); } diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index eb9c42a93fa02cc38b53c55e0fa3e5b6b257d500..7ec206f366a3e1b7151a6dae043dbc7937eb2a35 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -35,7 +35,6 @@ typedef struct _write { void *pCont; int32_t contLen; SRpcMsg rpcMsg; - void *pVnode; // pointer to vnode SRpcContext *pRpcContext; // RPC message context } SWriteMsg; @@ -51,15 +50,15 @@ typedef struct _thread_obj { SWriteWorker *writeWorker; } SWriteWorkerPool; -static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(SWriteMsg *); +static void (*dnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MAX])(void *, SWriteMsg *); static void *dnodeProcessWriteQueue(void *param); static void dnodeHandleIdleWorker(SWriteWorker *pWorker); -static void dnodeProcessWriteResult(SWriteMsg *pWrite); -static void dnodeProcessSubmitMsg(SWriteMsg *pMsg); -static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg); -static void dnodeProcessDropTableMsg(SWriteMsg *pMsg); -static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg); -static void dnodeProcessDropStableMsg(SWriteMsg *pMsg); +static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite); +static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg); +static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg); +static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg); +static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg); +static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg); SWriteWorkerPool wWorkerPool; @@ -116,15 +115,14 @@ void dnodeWrite(SRpcMsg *pMsg) { } // put message into queue - SWriteMsg writeMsg; - writeMsg.rpcMsg = *pMsg; - writeMsg.pCont = pCont; - writeMsg.contLen = pHead->contLen; - writeMsg.pRpcContext = pRpcContext; - writeMsg.pVnode = pVnode; // pVnode shall be saved for usage later + SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); + pWrite->rpcMsg = *pMsg; + pWrite->pCont = pCont; + pWrite->contLen = pHead->contLen; + pWrite->pRpcContext = pRpcContext; taos_queue queue = dnodeGetVnodeWworker(pVnode); - taosWriteQitem(queue, &writeMsg); + taosWriteQitem(queue, TAOS_QTYPE_RPC, pWrite); // next vnode leftLen -= pHead->contLen; @@ -144,16 +142,16 @@ void dnodeWrite(SRpcMsg *pMsg) { } } -void *dnodeAllocateWriteWorker() { +void *dnodeAllocateWriteWorker(void *pVnode) { SWriteWorker *pWorker = wWorkerPool.writeWorker + wWorkerPool.nextId; - taos_queue *queue = taosOpenQueue(sizeof(SWriteMsg)); + taos_queue *queue = taosOpenQueue(); if (queue == NULL) return NULL; if (pWorker->qset == NULL) { pWorker->qset = taosOpenQset(); if (pWorker->qset == NULL) return NULL; - taosAddIntoQset(pWorker->qset, queue); + taosAddIntoQset(pWorker->qset, queue, pVnode); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; pthread_attr_t thAttr; @@ -165,7 +163,7 @@ void *dnodeAllocateWriteWorker() { taosCloseQset(pWorker->qset); } } else { - taosAddIntoQset(pWorker->qset, queue); + taosAddIntoQset(pWorker->qset, queue, pVnode); wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } @@ -181,53 +179,57 @@ void dnodeFreeWriteWorker(void *wqueue) { static void *dnodeProcessWriteQueue(void *param) { SWriteWorker *pWorker = (SWriteWorker *)param; taos_qall qall; - SWriteMsg writeMsg; + SWriteMsg *pWriteMsg; int32_t numOfMsgs; + int type; + void *pVnode; + + qall = taosAllocateQall(); while (1) { - numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, &qall); + numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, qall, &pVnode); if (numOfMsgs <=0) { dnodeHandleIdleWorker(pWorker); // thread exit if no queues anymore continue; } - for (int32_t i=0; iwhandle, writeMsg.rpcMsg.msgType, writeMsg.pCont, writeMsg.contLen); } - + // flush WAL file // walFsync(pVnode->whandle); // browse all items, and process them one by one taosResetQitems(qall); for (int32_t i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, &writeMsg); + taosGetQitem(qall, &type, (void **)&pWriteMsg); terrno = 0; - if (dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) { - (*dnodeProcessWriteMsgFp[writeMsg.rpcMsg.msgType]) (&writeMsg); + if (dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) { + (*dnodeProcessWriteMsgFp[pWriteMsg->rpcMsg.msgType]) (pVnode, pWriteMsg); } else { terrno = TSDB_CODE_MSG_NOT_PROCESSED; } - dnodeProcessWriteResult(&writeMsg); + dnodeProcessWriteResult(pVnode, pWriteMsg); + taosFreeQitem(pWriteMsg); } - - // free the Qitems; - taosFreeQitems(qall); } + taosFreeQall(qall); + return NULL; } -static void dnodeProcessWriteResult(SWriteMsg *pWrite) { +static void dnodeProcessWriteResult(void *pVnode, SWriteMsg *pWrite) { SRpcContext *pRpcContext = pWrite->pRpcContext; int32_t code = 0; - dnodeReleaseVnode(pWrite->pVnode); + dnodeReleaseVnode(pVnode); if (pRpcContext) { if (terrno) { @@ -267,7 +269,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { } } -static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { +static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) { dTrace("submit msg is disposed"); SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg)); @@ -276,7 +278,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { pRsp->affectedRows = htonl(1); pRsp->numOfFailedBlocks = 0; - void* tsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + void* tsdb = dnodeGetVnodeTsdb(pVnode); assert(tsdb != NULL); tsdbInsertData(tsdb, pMsg->pCont); @@ -292,7 +294,7 @@ static void dnodeProcessSubmitMsg(SWriteMsg *pMsg) { rpcSendResponse(&rpcRsp); } -static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { +static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; @@ -341,16 +343,16 @@ static void dnodeProcessCreateTableMsg(SWriteMsg *pMsg) { tsdbTableSetTagValue(&tCfg, dataRow, false); } - void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + void *pTsdb = dnodeGetVnodeTsdb(pVnode); rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg); - dnodeReleaseVnode(pMsg->pVnode); + dnodeReleaseVnode(pVnode); dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } -static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { +static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) { SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; @@ -360,16 +362,16 @@ static void dnodeProcessDropTableMsg(SWriteMsg *pMsg) { .tid = htonl(pTable->sid) }; - void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + void *pTsdb = dnodeGetVnodeTsdb(pVnode); rpcRsp.code = tsdbDropTable(pTsdb, tableId); - dnodeReleaseVnode(pMsg->pVnode); + dnodeReleaseVnode(pVnode); dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } -static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { +static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) { SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; @@ -418,16 +420,16 @@ static void dnodeProcessAlterTableMsg(SWriteMsg *pMsg) { tsdbTableSetTagValue(&tCfg, dataRow, false); } - void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); + void *pTsdb = dnodeGetVnodeTsdb(pVnode); rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg); - dnodeReleaseVnode(pMsg->pVnode); + dnodeReleaseVnode(pVnode); dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); } -static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { +static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) { SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont; SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0}; @@ -439,7 +441,7 @@ static void dnodeProcessDropStableMsg(SWriteMsg *pMsg) { //rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid); rpcRsp.code = TSDB_CODE_SUCCESS; - dnodeReleaseVnode(pMsg->pVnode); + dnodeReleaseVnode(pVnode); dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code)); rpcSendResponse(&rpcRsp); diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 570e2e1acd0391f2f59f56b9b68b52e38310da14..d6a9447e3d192120a2712b46d1b2310c2732344f 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -176,7 +176,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size); #define TSDB_MAX_COLUMNS 256 #define TSDB_MIN_COLUMNS 2 //PRIMARY COLUMN(timestamp) + other columns -#define TSDB_DNODE_NAME_LEN 63 +#define TSDB_DNODE_NAME_LEN 64 #define TSDB_TABLE_NAME_LEN 192 #define TSDB_DB_NAME_LEN 32 #define TSDB_COL_NAME_LEN 64 diff --git a/src/mnode/inc/mgmtSuperTable.h b/src/mnode/inc/mgmtSuperTable.h index 922aafed7f9344ad908893cb6c7e6278a541bf35..6d0c565c3022f09460a4ec3eb84bc66c797030b7 100644 --- a/src/mnode/inc/mgmtSuperTable.h +++ b/src/mnode/inc/mgmtSuperTable.h @@ -45,7 +45,8 @@ void * mgmtGetSuperTableVgroup(SSuperTableObj *pStable); int32_t mgmtFindSuperTableTagIndex(SSuperTableObj *pTable, const char *tagName); int32_t mgmtSetSchemaFromSuperTable(SSchema *pSchema, SSuperTableObj *pTable); -void mgmtDropAllSuperTables(SDbObj *pDropDb); +void mgmtDropAllSuperTables(SDbObj *pDropDb); +int32_t mgmtExtractTableName(const char* tableId, char* name); #ifdef __cplusplus } diff --git a/src/mnode/src/mgmtSuperTable.c b/src/mnode/src/mgmtSuperTable.c index f2b54ec000091914f49a5f1b94b97cffc1bed8e6..e8161deb4dc10b27277ab18e7248acf7a7b08967 100644 --- a/src/mnode/src/mgmtSuperTable.c +++ b/src/mnode/src/mgmtSuperTable.c @@ -524,7 +524,7 @@ int32_t mgmtRetrieveShowSuperTables(SShowObj *pShow, char *data, int32_t rows, v } memset(stableName, 0, tListLen(stableName)); - extractTableName(pTable->tableId, stableName); + mgmtExtractTableName(pTable->tableId, stableName); if (pShow->payloadLen > 0 && patternMatch(pShow->payload, stableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) @@ -624,3 +624,17 @@ int32_t mgmtGetSuperTableMeta(SDbObj *pDb, SSuperTableObj *pTable, STableMetaMsg return TSDB_CODE_SUCCESS; } +int32_t mgmtExtractTableName(const char* tableId, char* name) { + int pos = -1; + int num = 0; + for (pos = 0; tableId[pos] != 0; ++pos) { + if (tableId[pos] == '.') num++; + if (num == 2) break; + } + + if (num == 2) { + strcpy(name, tableId + pos + 1); + } + return 0; +} + diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index 675de57924ccb46650085510079459a6c678f454..491e65979a5fc934970a6d899ac09e7dd06ceb07 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -310,7 +310,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * numOfRead++; // pattern compare for meter name - extractTableName(tableId, tableName); + mgmtExtractTableName(tableId, tableName); if (pShow->payloadLen > 0 && patternMatch(pShow->payload, tableName, TSDB_TABLE_NAME_LEN, &info) != TSDB_PATTERN_MATCH) { @@ -333,7 +333,7 @@ int32_t mgmtRetrieveShowTables(SShowObj *pShow, char *data, int32_t rows, void * pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; if (superTableId != NULL) { - extractTableName(superTableId, pWrite); + mgmtExtractTableName(superTableId, pWrite); } cols++; diff --git a/src/query/inc/queryExecutor.h b/src/query/inc/queryExecutor.h index c30d47e261dca42539f1279ca9b98f755c645a20..16f2006d6f54d1a4a9ad94c04b838a7e8a42ca63 100644 --- a/src/query/inc/queryExecutor.h +++ b/src/query/inc/queryExecutor.h @@ -172,7 +172,7 @@ typedef struct SQueryRuntimeEnv { typedef struct SQInfo { void* signature; - void* param; // pointer to the RpcReadMsg +// void* param; // pointer to the RpcReadMsg TSKEY startTime; TSKEY elapsedTime; int32_t pointsInterpo; @@ -236,6 +236,6 @@ int32_t qDumpRetrieveResult(SQInfo *pQInfo, SRetrieveTableRsp** pRsp, int32_t* c * @param pQInfo * @return */ -bool qNeedFurtherExec(SQInfo* pQInfo); +bool qHasMoreResultsToRetrieve(SQInfo* pQInfo); #endif // TDENGINE_QUERYEXECUTOR_H diff --git a/src/query/src/queryExecutor.c b/src/query/src/queryExecutor.c index a4da2073ea13c2144ac02ba3694829344df44e64..e9923c6fd8101fe810f40c14f3a87133856c6c9a 100644 --- a/src/query/src/queryExecutor.c +++ b/src/query/src/queryExecutor.c @@ -6102,7 +6102,7 @@ int32_t qCreateQueryInfo(void* tsdb, SQueryTableMsg *pQueryTableMsg, void* param // pObj->qhandle = vnodeQueryOnMultiMeters(pMeterObjList, pGroupbyExpr, pExprs, pQueryTableMsg, &code); } else { code = createQInfo(pQueryTableMsg, pGroupbyExpr, pExprs, pTableIdList, tsdb, pQInfo); - (*pQInfo)->param = param; +// (*pQInfo)->param = param; } _query_over: @@ -6206,7 +6206,7 @@ static int32_t doDumpQueryResult(SQInfo *pQInfo, char *data) { // todo if interpolation exists, the result may be dump to client by several rounds } -bool qNeedFurtherExec(SQInfo* pQInfo) { +bool qHasMoreResultsToRetrieve(SQInfo* pQInfo) { if (pQInfo == NULL || pQInfo->signature != pQInfo || pQInfo->code != TSDB_CODE_SUCCESS) { return false; } diff --git a/src/rpc/src/rpcMain.c b/src/rpc/src/rpcMain.c index c6f527a7d2851d71b9fe9d296a5ae571bac364b6..bf49be3ed6659e50dfd268a25c9bbac2017f6e68 100755 --- a/src/rpc/src/rpcMain.c +++ b/src/rpc/src/rpcMain.c @@ -32,6 +32,7 @@ #include "rpcServer.h" #include "rpcHead.h" #include "trpc.h" +#include "hash.h" #define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) #define rpcHeadFromCont(cont) ((SRpcHead *) (cont - sizeof(SRpcHead))) @@ -258,7 +259,8 @@ void *rpcOpen(SRpcInit *pInit) { } if (pRpc->connType == TAOS_CONN_SERVER) { - pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); +// pRpc->hash = taosInitStrHash(pRpc->sessions, sizeof(pRpc), taosHashString); + pRpc->hash = taosHashInit(pRpc->sessions, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true); if (pRpc->hash == NULL) { tError("%s failed to init string hash", pRpc->label); rpcClose(pRpc); @@ -292,7 +294,8 @@ void rpcClose(void *param) { } } - taosCleanUpStrHash(pRpc->hash); +// taosCleanUpStrHash(pRpc->hash); + taosHashCleanup(pRpc->hash); taosTmrCleanUp(pRpc->tmrCtrl); taosIdPoolCleanUp(pRpc->idPool); rpcCloseConnCache(pRpc->pCache); @@ -507,8 +510,10 @@ static void rpcCloseConn(void *thandle) { if ( pRpc->connType == TAOS_CONN_SERVER) { char hashstr[40] = {0}; - sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); - taosDeleteStrHash(pRpc->hash, hashstr); + size_t size = sprintf(hashstr, "%x:%x:%x:%d", pConn->peerIp, pConn->linkUid, pConn->peerId, pConn->connType); +// taosDeleteStrHash(pRpc->hash, hashstr); +// taosHashRemove(pRpc->hash, hashstr, size); + rpcFreeMsg(pConn->pRspMsg); // it may have a response msg saved, but not request msg pConn->pRspMsg = NULL; pConn->inType = 0; @@ -556,10 +561,11 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { char hashstr[40] = {0}; SRpcHead *pHead = (SRpcHead *)pRecv->msg; - sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); + size_t size = sprintf(hashstr, "%x:%x:%x:%d", pRecv->ip, pHead->linkUid, pHead->sourceId, pRecv->connType); // check if it is already allocated - SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); +// SRpcConn **ppConn = (SRpcConn **)(taosGetStrHashData(pRpc->hash, hashstr)); + SRpcConn **ppConn = (SRpcConn **)(taosHashGet(pRpc->hash, hashstr, size)); if (ppConn) pConn = *ppConn; if (pConn) return pConn; @@ -591,7 +597,9 @@ static SRpcConn *rpcAllocateServerConn(SRpcInfo *pRpc, SRecvInfo *pRecv) { pConn->localPort = (pRpc->localPort + pRpc->index); } - taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); +// taosAddStrHash(pRpc->hash, hashstr, (char *)&pConn); + taosHashPut(pRpc->hash, hashstr, size, (char *)&pConn, POINTER_BYTES); + tTrace("%s %p, rpc connection is allocated, sid:%d id:%s port:%u", pRpc->label, pConn, sid, pConn->user, pConn->localPort); } diff --git a/src/rpc/test/rserver.c b/src/rpc/test/rserver.c index d9e5da51a67f34c7eeeff3c148e93b26defdc9c4..6c5b320809b8c714a3f30caefa952e289b57a36d 100644 --- a/src/rpc/test/rserver.c +++ b/src/rpc/test/rserver.c @@ -28,10 +28,13 @@ void *qhandle = NULL; void processShellMsg() { static int num = 0; taos_qall qall; - SRpcMsg rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); while (1) { - int numOfMsgs = taosReadAllQitems(qhandle, &qall); + int numOfMsgs = taosReadAllQitems(qhandle, qall); if (numOfMsgs <= 0) { usleep(1000); continue; @@ -40,10 +43,10 @@ void processShellMsg() { tTrace("%d shell msgs are received", numOfMsgs); for (int i=0; i=0) { - if ( write(dataFd, rpcMsg.pCont, rpcMsg.contLen) <0 ) { + if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) { tPrint("failed to write data file, reason:%s", strerror(errno)); } } @@ -62,19 +65,22 @@ void processShellMsg() { taosResetQitems(qall); for (int i=0; ipCont); + rpcMsg.pCont = rpcMallocCont(msgSize); rpcMsg.contLen = msgSize; - rpcMsg.handle = rpcMsg.handle; + rpcMsg.handle = pRpcMsg->handle; rpcMsg.code = 1; rpcSendResponse(&rpcMsg); + + taosFreeQitem(pRpcMsg); } - taosFreeQitems(qall); } + taosFreeQall(qall); /* SRpcIpSet ipSet; ipSet.numOfIps = 1; @@ -108,8 +114,13 @@ int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char } void processRequestMsg(SRpcMsg *pMsg) { - tTrace("request is received, type:%d, contLen:%d", pMsg->msgType, pMsg->contLen); - taosWriteQitem(qhandle, pMsg); + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + tTrace("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); + taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); } int main(int argc, char *argv[]) { @@ -143,6 +154,7 @@ int main(int argc, char *argv[]) { commit = atoi(argv[++i]); } else if (strcmp(argv[i], "-d")==0 && i < argc-1) { rpcDebugFlag = atoi(argv[++i]); + ddebugFlag = rpcDebugFlag; uDebugFlag = rpcDebugFlag; } else { printf("\nusage: %s [options] \n", argv[0]); diff --git a/src/util/inc/tlog.h b/src/util/inc/tlog.h index 3a327b0a471c4671c684d80e0215f0c863239418..8a3bd6bc7c9b2728430f5341d0f1b0c94bc9b17c 100644 --- a/src/util/inc/tlog.h +++ b/src/util/inc/tlog.h @@ -78,7 +78,7 @@ void taosResetLogFile(); // utility log function #define pError(...) \ if (uDebugFlag & DEBUG_ERROR) { \ - tprintf("ERROR UTL ", 255, __VA_ARGS__); \ + tprintf("ERROR UTL ", uDebugFlag, __VA_ARGS__); \ } #define pWarn(...) \ if (uDebugFlag & DEBUG_WARN) { \ diff --git a/src/util/inc/tqueue.h b/src/util/inc/tqueue.h index 97408110d4d8acc9c5b77b0fe296e58c32ec8275..c45eb10518765e65142eabd3294c8cc851331f3a 100644 --- a/src/util/inc/tqueue.h +++ b/src/util/inc/tqueue.h @@ -20,28 +20,35 @@ extern "C" { #endif +#define TAOS_QTYPE_RPC 0 +#define TAOS_QTYPE_FWD 1 +#define TAOS_QTYPE_WAL 2 + typedef void* taos_queue; typedef void* taos_qset; typedef void* taos_qall; -taos_queue taosOpenQueue(int itemSize); +taos_queue taosOpenQueue(); void taosCloseQueue(taos_queue); -int taosWriteQitem(taos_queue, void *item); -int taosReadQitem(taos_queue, void *item); - -int taosReadAllQitems(taos_queue, taos_qall *); -int taosGetQitem(taos_qall, void *item); +void *taosAllocateQitem(int size); +void taosFreeQitem(void *item); +int taosWriteQitem(taos_queue, int type, void *item); +int taosReadQitem(taos_queue, int *type, void **pitem); + +taos_qall taosAllocateQall(); +void taosFreeQall(taos_qall); +int taosReadAllQitems(taos_queue, taos_qall); +int taosGetQitem(taos_qall, int *type, void **pitem); void taosResetQitems(taos_qall); -void taosFreeQitems(taos_qall); taos_qset taosOpenQset(); void taosCloseQset(); -int taosAddIntoQset(taos_qset, taos_queue); +int taosAddIntoQset(taos_qset, taos_queue, void *ahandle); void taosRemoveFromQset(taos_qset, taos_queue); int taosGetQueueNumber(taos_qset); -int taosReadQitemFromQset(taos_qset, void *item); -int taosReadAllQitemsFromQset(taos_qset, taos_qall *); +int taosReadQitemFromQset(taos_qset, int *type, void **pitem, void **handle); +int taosReadAllQitemsFromQset(taos_qset, taos_qall, void **handle); int taosGetQueueItemsNumber(taos_queue param); int taosGetQsetItemsNumber(taos_qset param); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index cb218126a877714c71ef2363960695fa4fb7694b..1ac05556d6a90eb664c0a907f752010bf4ff52f5 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -19,6 +19,7 @@ #include "tqueue.h" typedef struct _taos_qnode { + int type; struct _taos_qnode *next; char item[]; } STaosQnode; @@ -30,6 +31,7 @@ typedef struct _taos_q { struct _taos_qnode *tail; struct _taos_q *next; // for queue set struct _taos_qset *qset; // for queue set + void *ahandle; // for queue set pthread_mutex_t mutex; } STaosQueue; @@ -48,7 +50,7 @@ typedef struct _taos_qall { int32_t numOfItems; } STaosQall; -taos_queue taosOpenQueue(int itemSize) { +taos_queue taosOpenQueue() { STaosQueue *queue = (STaosQueue *) calloc(sizeof(STaosQueue), 1); if (queue == NULL) { @@ -57,8 +59,6 @@ taos_queue taosOpenQueue(int itemSize) { } pthread_mutex_init(&queue->mutex, NULL); - queue->itemSize = (int32_t)itemSize; - return queue; } @@ -83,16 +83,26 @@ void taosCloseQueue(taos_queue param) { free(queue); } -int taosWriteQitem(taos_queue param, void *item) { - STaosQueue *queue = (STaosQueue *)param; +void *taosAllocateQitem(int size) { + STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + size, 1); + if (pNode == NULL) return NULL; + return (void *)pNode->item; +} - STaosQnode *pNode = (STaosQnode *)calloc(sizeof(STaosQnode) + queue->itemSize, 1); - if ( pNode == NULL ) { - terrno = TSDB_CODE_NO_RESOURCE; - return -1; - } +void taosFreeQitem(void *param) { + if (param == NULL) return; - memcpy(pNode->item, item, queue->itemSize); + //pTrace("item:%p is freed", param); + + char *temp = (char *)param; + temp -= sizeof(STaosQnode); + free(temp); +} + +int taosWriteQitem(taos_queue param, int type, void *item) { + STaosQueue *queue = (STaosQueue *)param; + STaosQnode *pNode = (STaosQnode *)(((char *)item) - sizeof(STaosQnode)); + pNode->type = type; pthread_mutex_lock(&queue->mutex); @@ -107,12 +117,14 @@ int taosWriteQitem(taos_queue param, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); + //pTrace("item:%p is put into queue, items:%d", item, queue->numOfItems); + pthread_mutex_unlock(&queue->mutex); return 0; } -int taosReadQitem(taos_queue param, void *item) { +int taosReadQitem(taos_queue param, int *type, void **pitem) { STaosQueue *queue = (STaosQueue *)param; STaosQnode *pNode = NULL; int code = 0; @@ -121,14 +133,15 @@ int taosReadQitem(taos_queue param, void *item) { if (queue->head) { pNode = queue->head; - memcpy(item, pNode->item, queue->itemSize); + *pitem = pNode->item; + *type = pNode->type; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; - free(pNode); queue->numOfItems--; if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, 1); code = 1; + //pTrace("item:%p is read out from queue, items:%d", *pitem, queue->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -136,39 +149,42 @@ int taosReadQitem(taos_queue param, void *item) { return code; } -int taosReadAllQitems(taos_queue param, taos_qall *res) { +void *taosAllocateQall() { + void *p = malloc(sizeof(STaosQall)); + return p; +} + +void taosFreeQall(void *param) { + free(param); +} + +int taosReadAllQitems(taos_queue param, taos_qall p2) { STaosQueue *queue = (STaosQueue *)param; - STaosQall *qall = NULL; + STaosQall *qall = (STaosQall *)p2; int code = 0; pthread_mutex_lock(&queue->mutex); if (queue->head) { - qall = (STaosQall *) calloc(sizeof(STaosQall), 1); - if ( qall == NULL ) { - terrno = TSDB_CODE_NO_RESOURCE; - code = -1; - } else { - qall->current = queue->head; - qall->start = queue->head; - qall->numOfItems = queue->numOfItems; - qall->itemSize = queue->itemSize; - code = qall->numOfItems; - - queue->head = NULL; - queue->tail = NULL; - queue->numOfItems = 0; - if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); - } + memset(qall, 0, sizeof(STaosQall)); + qall->current = queue->head; + qall->start = queue->head; + qall->numOfItems = queue->numOfItems; + qall->itemSize = queue->itemSize; + code = qall->numOfItems; + + queue->head = NULL; + queue->tail = NULL; + queue->numOfItems = 0; + if (queue->qset) atomic_sub_fetch_32(&queue->qset->numOfItems, qall->numOfItems); } pthread_mutex_unlock(&queue->mutex); - *res = qall; return code; } -int taosGetQitem(taos_qall param, void *item) { +int taosGetQitem(taos_qall param, int *type, void **pitem) { STaosQall *qall = (STaosQall *)param; STaosQnode *pNode; int num = 0; @@ -178,8 +194,10 @@ int taosGetQitem(taos_qall param, void *item) { qall->current = pNode->next; if (pNode) { - memcpy(item, pNode->item, qall->itemSize); + *pitem = pNode->item; + *type = pNode->type; num = 1; + //pTrace("item:%p is fetched", *pitem); } return num; @@ -190,19 +208,6 @@ void taosResetQitems(taos_qall param) { qall->current = qall->start; } -void taosFreeQitems(taos_qall param) { - STaosQall *qall = (STaosQall *)param; - STaosQnode *pNode; - - while (qall->current) { - pNode = qall->current; - qall->current = pNode->next; - free(pNode); - } - - free(qall); -} - taos_qset taosOpenQset() { STaosQset *qset = (STaosQset *) calloc(sizeof(STaosQset), 1); @@ -221,7 +226,7 @@ void taosCloseQset(taos_qset param) { free(qset); } -int taosAddIntoQset(taos_qset p1, taos_queue p2) { +int taosAddIntoQset(taos_qset p1, taos_queue p2, void *ahandle) { STaosQueue *queue = (STaosQueue *)p2; STaosQset *qset = (STaosQset *)p1; @@ -230,6 +235,7 @@ int taosAddIntoQset(taos_qset p1, taos_queue p2) { pthread_mutex_lock(&qset->mutex); queue->next = qset->head; + queue->ahandle = ahandle; qset->head = queue; qset->numOfQueues++; @@ -283,7 +289,7 @@ int taosGetQueueNumber(taos_qset param) { return ((STaosQset *)param)->numOfQueues; } -int taosReadQitemFromQset(taos_qset param, void *item) { +int taosReadQitemFromQset(taos_qset param, int *type, void **pitem, void **phandle) { STaosQset *qset = (STaosQset *)param; STaosQnode *pNode = NULL; int code = 0; @@ -301,11 +307,12 @@ int taosReadQitemFromQset(taos_qset param, void *item) { if (queue->head) { pNode = queue->head; - memcpy(item, pNode->item, queue->itemSize); + *pitem = pNode->item; + *type = pNode->type; + *phandle = queue->ahandle; queue->head = pNode->next; if (queue->head == NULL) queue->tail = NULL; - free(pNode); queue->numOfItems--; atomic_sub_fetch_32(&qset->numOfItems, 1); code = 1; @@ -318,10 +325,10 @@ int taosReadQitemFromQset(taos_qset param, void *item) { return code; } -int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { +int taosReadAllQitemsFromQset(taos_qset param, taos_qall p2, void **phandle) { STaosQset *qset = (STaosQset *)param; STaosQueue *queue; - STaosQall *qall = NULL; + STaosQall *qall = (STaosQall *)p2; int code = 0; for(int i=0; inumOfQueues; ++i) { @@ -336,22 +343,17 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { pthread_mutex_lock(&queue->mutex); if (queue->head) { - qall = (STaosQall *) calloc(sizeof(STaosQall), 1); - if (qall == NULL) { - terrno = TSDB_CODE_NO_RESOURCE; - code = -1; - } else { - qall->current = queue->head; - qall->start = queue->head; - qall->numOfItems = queue->numOfItems; - qall->itemSize = queue->itemSize; - code = qall->numOfItems; + qall->current = queue->head; + qall->start = queue->head; + qall->numOfItems = queue->numOfItems; + qall->itemSize = queue->itemSize; + code = qall->numOfItems; + *phandle = queue->ahandle; - queue->head = NULL; - queue->tail = NULL; - queue->numOfItems = 0; - atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); - } + queue->head = NULL; + queue->tail = NULL; + queue->numOfItems = 0; + atomic_sub_fetch_32(&qset->numOfItems, qall->numOfItems); } pthread_mutex_unlock(&queue->mutex); @@ -359,8 +361,6 @@ int taosReadAllQitemsFromQset(taos_qset param, taos_qall *res) { if (code != 0) break; } - *res = qall; - return code; } diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index 7ab004646e3094d1a231e30eafaf6a966f0bba8b..86be428af70f6bb04bc72c51f64b2dbb333e493d 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -330,7 +330,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { struct sockaddr_in serverAddr, clientAddr; int ret; - pTrace("open tcp client socket:%s:%d", destIp, destPort); + // pTrace("open tcp client socket:%s:%d, local Ip:%s", destIp, destPort, clientIp); sockFd = (int)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); @@ -362,7 +362,7 @@ int taosOpenTcpClientSocket(char *destIp, uint16_t destPort, char *clientIp) { ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); if (ret != 0) { - pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); + //pError("failed to connect socket, ip:%s, port:%hu, reason: %s", destIp, destPort, strerror(errno)); taosCloseSocket(sockFd); sockFd = -1; } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 7f3acb66249570c015dfb0bf31106907d8c047a5..9a4d94c58fc99ef8fc647d188d2d41f2979db212 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -34,20 +34,22 @@ typedef enum { TSDB_FILE_TYPE_MAX } TSDB_FILE_TYPE; -extern const char *tsdbFileSuffix[]; +#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) -typedef struct { - int64_t size; - int64_t tombSize; -} SFileInfo; +extern const char *tsdbFileSuffix[]; typedef struct { int8_t type; + int fd; char fname[128]; int64_t size; // total size of the file int64_t tombSize; // unused file size + int32_t totalBlocks; + int32_t totalSubBlocks; } SFile; +#define TSDB_IS_FILE_OPENED(f) ((f)->fd != -1) + typedef struct { int32_t fileId; SFile files[TSDB_FILE_TYPE_MAX]; @@ -55,14 +57,26 @@ typedef struct { // TSDB file handle typedef struct { - int32_t daysPerFile; - int32_t keep; - int32_t minRowPerFBlock; - int32_t maxRowsPerFBlock; - int32_t maxTables; + int maxFGroups; + int numOfFGroups; + SFileGroup fGroup[]; } STsdbFileH; +#define TSDB_MIN_FILE_ID(fh) (fh)->fGroup[0].fileId +#define TSDB_MAX_FILE_ID(fh) (fh)->fGroup[(fh)->numOfFGroups - 1].fileId + +STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles); +void tsdbCloseFileH(STsdbFileH *pFileH); +int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables); +int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid); + +typedef struct { + int32_t len; + int32_t padding; // For padding purpose + int64_t offset; +} SCompIdx; + /** * if numOfSubBlocks == -1, then the SCompBlock is a sub-block * if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to @@ -83,14 +97,32 @@ typedef struct { TSKEY keyLast; } SCompBlock; -#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX) +typedef struct { + int32_t delimiter; // For recovery usage + int32_t checksum; // TODO: decide if checksum logic in this file or make it one API + int64_t uid; + int32_t padding; // For padding purpose + int32_t numOfBlocks; // TODO: make the struct padding + SCompBlock blocks[]; +} SCompInfo; -STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock, int32_t maxTables); +// TODO: take pre-calculation into account +typedef struct { + int16_t colId; // Column ID + int16_t len; // Column length + int32_t type : 8; + int32_t offset : 24; +} SCompCol; + +// TODO: Take recover into account +typedef struct { + int32_t delimiter; // For recovery usage + int32_t numOfCols; // For recovery usage + int64_t uid; // For recovery usage + SCompCol cols[]; +} SCompData; -void tsdbCloseFile(STsdbFileH *pFileH); -int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); -void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); +void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 98562be0cce5e057049c20d4f586453522908750..f622c38b5ff4acf79e5ec405de0343a5d5197a19 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -27,72 +27,126 @@ #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F -typedef struct { - int32_t len; - int32_t padding; // For padding purpose - int64_t offset; -} SCompIdx; - -typedef struct { - int32_t delimiter; // For recovery usage - int32_t checksum; // TODO: decide if checksum logic in this file or make it one API - int64_t uid; - int32_t padding; // For padding purpose - int32_t numOfBlocks; // TODO: make the struct padding - SCompBlock blocks[]; -} SCompInfo; - -// TODO: take pre-calculation into account -typedef struct { - int16_t colId; // Column ID - int16_t len; // Column length - int32_t type : 8; - int32_t offset : 24; -} SCompCol; - -// TODO: Take recover into account -typedef struct { - int32_t delimiter; // For recovery usage - int32_t numOfCols; // For recovery usage - int64_t uid; // For recovery usage - SCompCol cols[]; -} SCompData; - const char *tsdbFileSuffix[] = { ".head", // TSDB_FILE_TYPE_HEAD ".data", // TSDB_FILE_TYPE_DATA ".last" // TSDB_FILE_TYPE_LAST }; -static int tsdbWriteFileHead(int fd, SFile *pFile) { +static int compFGroupKey(const void *key, const void *fgroup); +static int compFGroup(const void *arg1, const void *arg2); +static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname); +static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile); +static int tsdbWriteFileHead(SFile *pFile); +static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables); + +STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles) { + STsdbFileH *pFileH = (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * maxFiles); + if (pFileH == NULL) { // TODO: deal with ERROR here + return NULL; + } + + pFileH->maxFGroups = maxFiles; + + DIR *dir = opendir(dataDir); + if (dir == NULL) { + free(pFileH); + return NULL; + } + + struct dirent *dp; + while ((dp = readdir(dir)) != NULL) { + if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 1) == 0) continue; + // TODO + } + + return pFileH; +} + +void tsdbCloseFileH(STsdbFileH *pFileH) { free(pFileH); } + +int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) { + if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1; + + SFileGroup fGroup; + SFileGroup *pFGroup = &fGroup; + if (fid < TSDB_MIN_FILE_ID(pFileH) || fid > TSDB_MAX_FILE_ID(pFileH) || + bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey) == + NULL) { + pFGroup->fileId = fid; + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + if (tsdbCreateFile(dataDir, fid, type, maxTables, &(pFGroup->files[type])) < 0) { + // TODO: deal with the ERROR here, remove those creaed file + return -1; + } + } + + pFileH->fGroup[pFileH->numOfFGroups++] = fGroup; + qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup); + } + return 0; +} + +int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { + SFileGroup *pGroup = + bsearch((void *)&fid, (void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroupKey); + if (pGroup == NULL) return -1; + + // Remove from disk + for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { + remove(pGroup->files[type].fname); + } + + // Adjust the memory + int filesBehind = pFileH->numOfFGroups - (((char *)pGroup - (char *)(pFileH->fGroup)) / sizeof(SFileGroup) + 1); + if (filesBehind > 0) { + memmove((void *)pGroup, (void *)((char *)pGroup + sizeof(SFileGroup)), sizeof(SFileGroup) * filesBehind); + } + pFileH->numOfFGroups--; + + return 0; +} + +static int compFGroupKey(const void *key, const void *fgroup) { + int fid = *(int *)key; + SFileGroup *pFGroup = (SFileGroup *)fgroup; + return (fid - pFGroup->fileId); +} + +static int compFGroup(const void *arg1, const void *arg2) { + return ((SFileGroup *)arg1)->fileId - ((SFileGroup *)arg2)->fileId; +} + +static int tsdbWriteFileHead(SFile *pFile) { char head[TSDB_FILE_HEAD_SIZE] = "\0"; pFile->size += TSDB_FILE_HEAD_SIZE; // TODO: write version and File statistic to the head - lseek(fd, 0, SEEK_SET); - if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; + lseek(pFile->fd, 0, SEEK_SET); + if (write(pFile->fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1; return 0; } -static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) { +static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) { int size = sizeof(SCompIdx) * maxTables; void *buf = calloc(1, size); if (buf == NULL) return -1; - if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { + if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) { free(buf); return -1; } - if (write(fd, buf, size) < 0) { + if (write(pFile->fd, buf, size) < 0) { free(buf); return -1; } pFile->size += size; + free(buf); return 0; } @@ -104,12 +158,27 @@ static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) return 0; } -/** - * Create a file and set the SFile object - */ +static int tsdbOpenFileForWrite(SFile *pFile, int oflag) { // TODO: change the function + if (TSDB_IS_FILE_OPENED(pFile)) return -1; + + pFile->fd = open(pFile->fname, oflag, 0755); + if (pFile->fd < 0) return -1; + + return 0; +} + +static int tsdbCloseFile(SFile *pFile) { + if (!TSDB_IS_FILE_OPENED(pFile)) return -1; + int ret = close(pFile->fd); + pFile->fd = -1; + + return ret; +} + static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) { memset((void *)pFile, 0, sizeof(SFile)); pFile->type = type; + pFile->fd = -1; tsdbGetFileName(dataDir, fileId, type, pFile->fname); if (access(pFile->fname, F_OK) == 0) { @@ -117,93 +186,28 @@ static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, return -1; } - int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755); - if (fd < 0) return -1; + if (tsdbOpenFileForWrite(pFile, O_WRONLY | O_CREAT) < 0) { + // TODO: deal with the ERROR here + return -1; + } if (type == TSDB_FILE_TYPE_HEAD) { - if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) { - close(fd); + if (tsdbWriteHeadFileIdx(pFile, maxTables) < 0) { + tsdbCloseFile(pFile); return -1; } } - if (tsdbWriteFileHead(fd, pFile) < 0) { - close(fd); + if (tsdbWriteFileHead(pFile) < 0) { + tsdbCloseFile(pFile); return -1; } - close(fd); - - return 0; -} - -static int tsdbRemoveFile(SFile *pFile) { - if (pFile == NULL) return -1; - return remove(pFile->fname); -} - -// Create a file group with fileId and return a SFileGroup object -int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) { - if (dataDir == NULL || pFGroup == NULL) return -1; - - memset((void *)pFGroup, 0, sizeof(SFileGroup)); - - for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) { - if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) { - // TODO: deal with the error here, remove the created files - return -1; - } - } - - pFGroup->fileId = fileId; + tsdbCloseFile(pFile); return 0; } -/** - * Initialize the TSDB file handle - */ -STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, - int32_t maxRowsPerFBlock, int32_t maxTables) { - STsdbFileH *pTsdbFileH = - (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile)); - if (pTsdbFileH == NULL) return NULL; - - pTsdbFileH->daysPerFile = daysPerFile; - pTsdbFileH->keep = keep; - pTsdbFileH->minRowPerFBlock = minRowsPerFBlock; - pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock; - pTsdbFileH->maxTables = maxTables; - - // Open the directory to read information of each file - DIR *dir = opendir(dataDir); - if (dir == NULL) { - free(pTsdbFileH); - return NULL; - } - - char fname[256]; - - struct dirent *dp; - while ((dp = readdir(dir)) != NULL) { - if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; - if (true /* check if the file is the .head file */) { - int fileId = 0; - int vgId = 0; - sscanf(dp->d_name, "v%df%d.head", &vgId, &fileId); - // TODO - - // Open head file - - // Open data file - - // Open last file - } - } - - return pTsdbFileH; -} - void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 2df7974844943eaa90947f45baa66724d2240cb5..af3a923d904d62c5fc3af8d8f4ca8dfad64c7282 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -182,7 +182,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO char dataDir[128] = "\0"; tsdbGetDataDirName(pRepo, dataDir); pRepo->tsdbFileH = - tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables); + tsdbInitFileH(dataDir, pCfg->maxTables); if (pRepo->tsdbFileH == NULL) { free(pRepo->rootDir); tsdbFreeCache(pRepo->tsdbCache); @@ -782,19 +782,51 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max return numOfRows; } +static void tsdbDestroyTableIters(SSkipListIterator **iters, int maxTables) { + if (iters == NULL) return; + + for (int tid = 0; tid < maxTables; tid++) { + if (iters[tid] == NULL) continue; + tSkipListDestroyIter(iters[tid]); + } + + free(iters); +} + +static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables) { + SSkipListIterator **iters = (SSkipListIterator *)calloc(maxTables, sizeof(SSkipListIterator *)); + if (iters == NULL) return NULL; + + for (int tid = 0; tid < maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + if (pTable == NULL || pTable->imem == NULL) continue; + + iters[tid] = tSkipListCreateIter(pTable->imem->pData); + if (iters[tid] == NULL) { + tsdbDestroyTableIters(iters, maxTables); + return NULL; + } + + if (!tSkipListIterNext(iters[tid])) { + assert(false); + } + } + + return iters; +} + // Commit to file static void *tsdbCommitToFile(void *arg) { // TODO + printf("Starting to commit....\n"); STsdbRepo * pRepo = (STsdbRepo *)arg; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbCache *pCache = pRepo->tsdbCache; STsdbCfg * pCfg = &(pRepo->config); if (pCache->imem == NULL) return; - int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); - int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); - - SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *)); + // Create the iterator to read from cache + SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables); if (iters == NULL) { // TODO: deal with the error return NULL; @@ -805,10 +837,15 @@ static void *tsdbCommitToFile(void *arg) { SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols); void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock); + int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision); + int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision); + for (int fid = sfid; fid <= efid; fid++) { TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + // tsdbOpenFileForWrite(pRepo, fid); + for (int tid = 0; tid < pCfg->maxTables; tid++) { STable *pTable = pMeta->tables[tid]; if (pTable == NULL || pTable->imem == NULL) continue; @@ -837,14 +874,10 @@ static void *tsdbCommitToFile(void *arg) { } } - // Free the iterator - for (int tid = 0; tid < pCfg->maxTables; tid++) { - if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]); - } + tsdbDestroyTableIters(iters, pCfg->maxTables); free(buf); free(cols); - free(iters); tsdbLockRepo(arg); tdListMove(pCache->imem->list, pCache->pool.memPool); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index a76aef2d41d154ca64ac48bfb3a5075309fe3f7f..bc6532984fddd74934c9f87a9e5f1086f08faef3 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -142,7 +142,7 @@ TEST(TsdbTest, DISABLED_openRepo) { TEST(TsdbTest, DISABLED_createFileGroup) { SFileGroup fGroup; - ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0); + // ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0); int k = 0; } \ No newline at end of file diff --git a/tests/examples/c/demo.c b/tests/examples/c/demo.c index dc16185d9a1b74cad4492c09f48dc3437772e16e..6fcedb8123cb2ea8e6bb6b89a3b7d45c96bcd3f6 100644 --- a/tests/examples/c/demo.c +++ b/tests/examples/c/demo.c @@ -64,6 +64,9 @@ int main(int argc, char *argv[]) { memset(buf, 0, 512); } + taos_close(taos); + + getchar(); return 0; taos_query(taos, "drop database demo");