diff --git a/src/dnode/src/dnodeWrite.c b/src/dnode/src/dnodeWrite.c index 2348cf62a4c2b295a47b185087f0ed5b16c00596..aee14ed48414aa68590e246f8563806cd1df6a20 100644 --- a/src/dnode/src/dnodeWrite.c +++ b/src/dnode/src/dnodeWrite.c @@ -40,7 +40,7 @@ typedef struct { SRpcMsg rpcMsg; } SWriteMsg; -typedef struct _thread_obj { +typedef struct _wworker_pool { int32_t max; // max number of workers int32_t nextId; // from 0 to max-1, cyclic SWriteWorker *writeWorker; diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 6d75776ac810140bdb54670e4cc723ef1e2b4e3f..2cf94267f8dd29b323dba1cbd8706388066b463d 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -92,7 +92,7 @@ void *taosAllocateQitem(int size) { void taosFreeQitem(void *param) { if (param == NULL) return; - //pTrace("item:%p is freed", param); + pTrace("item:%p is freed", param); char *temp = (char *)param; temp -= sizeof(STaosQnode); @@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) { queue->numOfItems++; if (queue->qset) atomic_add_fetch_32(&queue->qset->numOfItems, 1); - //pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); + pTrace("item:%p is put into queue, type:%d items:%d", item, type, queue->numOfItems); pthread_mutex_unlock(&queue->mutex); @@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) { *pitem = pNode->item; *type = pNode->type; num = 1; - // pTrace("item:%p is fetched, type:%d", *pitem, *type); + pTrace("item:%p is fetched, type:%d", *pitem, *type); } return num; diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index be41b6b7c199d5f73aa39a3f688a304fab7381f5..5bb5ef55efd22c380a14c13d3602c7c2ff8dd01b 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -165,7 +165,10 @@ void vnodeRelease(void *pVnodeRaw) { int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); - if (refCount > 0) return; + if (refCount > 0) { + dTrace("pVnode:%p vgId:%d, release vnode, refCount:%d", pVnode, pVnode->vgId, refCount); + return; + } // remove read queue dnodeFreeRqueue(pVnode->rqueue); diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index 2c31f2243bbe773c6c3857617200a823d0c42a09..c6699bd62c5ef8d0b797b37da22e79307bf0301f 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -255,7 +255,8 @@ int vnodeWriteToQueue(void *param, SWalHead *pHead, int type) { SWalHead *pWal = (SWalHead *)taosAllocateQitem(size); memcpy(pWal, pHead, size); - taosWriteQitem(pVnode->wqueue, type, pHead); + atomic_add_fetch_32(&pVnode->refCount, 1); + taosWriteQitem(pVnode->wqueue, type, pWal); return 0; } diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 88ede6aabdfde14fa8df6121a52c6e68c084eda7..cb59c004efed1533326a8517e657f177aef16c29 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -50,7 +50,7 @@ enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURI static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo); static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo); -static int tsdbOpenMetaFile(char *tsdbDir); +// static int tsdbOpenMetaFile(char *tsdbDir); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); @@ -222,10 +222,10 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) { return (tsdb_repo_t *)pRepo; } -static int32_t tsdbFlushCache(STsdbRepo *pRepo) { - // TODO - return 0; -} +// static int32_t tsdbFlushCache(STsdbRepo *pRepo) { +// // TODO +// return 0; +// } /** * Close a TSDB repository. Only free memory resources, and keep the files. @@ -679,10 +679,10 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) { return 0; } -static int tsdbOpenMetaFile(char *tsdbDir) { - // TODO - return 0; -} +// static int tsdbOpenMetaFile(char *tsdbDir) { +// // TODO +// return 0; +// } static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable) { // TODO @@ -937,7 +937,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks); - SCompBlock tBlock; + SCompData tBlock; int64_t toffset; int32_t tlen; tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock); @@ -968,7 +968,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters pCompInfo->uid = pTable->tableId.uid; // Load SCompBlock part if neccessary - int isCompBlockLoaded = 0; + // int isCompBlockLoaded = 0; if (0) { // if (pIdx->offset > 0) { if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) { @@ -976,7 +976,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100); if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */ } - if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; + // if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1; } else { // TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part // and write those new blocks to it @@ -1023,7 +1023,6 @@ _table_over: int bytes = tsendfile(hFile.fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len); if (bytes < pIdx->len) { printf("Failed to send file, reason: %s\n", strerror(errno)); - int d = 1; } if (nNewBlocks > 0) { write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks); diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 01fb37eec18372c375b44e33f3fb0ee2fe92eca8..0c6fc6170144d239daf162d7232577fb520e5b34 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -374,10 +374,10 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) { return tsdbAddTableIntoMap(pMeta, pTable); } -static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { - // TODO - return 0; -} +// static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) { +// // TODO +// return 0; +// } static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) { // TODO: add the table to the map diff --git a/src/vnode/tsdb/src/tsdbMetaFile.c b/src/vnode/tsdb/src/tsdbMetaFile.c index 7ecb5c15ac0e983f9aa80fc2f86f7a62fde9f643..d3cff1772c6732d22d38c359413c45a5212d97b6 100644 --- a/src/vnode/tsdb/src/tsdbMetaFile.c +++ b/src/vnode/tsdb/src/tsdbMetaFile.c @@ -29,7 +29,7 @@ typedef struct { } SRecordInfo; static int32_t tsdbGetMetaFileName(char *rootDir, char *fname); -static int32_t tsdbCheckMetaHeader(int fd); +// static int32_t tsdbCheckMetaHeader(int fd); static int32_t tsdbWriteMetaHeader(int fd); static int tsdbCreateMetaFile(char *fname); static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh); @@ -185,10 +185,10 @@ static int32_t tsdbGetMetaFileName(char *rootDir, char *fname) { return 0; } -static int32_t tsdbCheckMetaHeader(int fd) { - // TODO: write the meta file header check function - return 0; -} +// static int32_t tsdbCheckMetaHeader(int fd) { +// // TODO: write the meta file header check function +// return 0; +// } static int32_t tsdbWriteMetaHeader(int fd) { // TODO: write the meta file header to file @@ -199,10 +199,10 @@ static int32_t tsdbWriteMetaHeader(int fd) { return 0; } -static int32_t tsdbReadMetaHeader(int fd) { - lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET); - return 0; -} +// static int32_t tsdbReadMetaHeader(int fd) { +// lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET); +// return 0; +// } static int tsdbCreateMetaFile(char *fname) { int fd = open(fname, O_RDWR | O_CREAT, 0755); diff --git a/src/vnode/wal/src/walMain.c b/src/vnode/wal/src/walMain.c index e192e91e8a156a4d34a38101afbeb96c4adf9e03..9708b0d9dc31429412aff292567175983cd20f32 100644 --- a/src/vnode/wal/src/walMain.c +++ b/src/vnode/wal/src/walMain.c @@ -287,6 +287,8 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW (*writeFp)(pVnode, pHead, TAOS_QTYPE_WAL); } + free(buffer); + return code; }