提交 8ef6eb4c 编写于 作者: dengyihao's avatar dengyihao

enh: refactor index code

上级 6ca5e3ae
......@@ -62,6 +62,7 @@ extern int32_t fsDebugFlag;
extern int32_t metaDebugFlag;
extern int32_t fnDebugFlag;
extern int32_t smaDebugFlag;
extern int32_t idxDebugFlag;
int32_t taosInitLog(const char *logName, int32_t maxFiles);
void taosCloseLog();
......
......@@ -79,9 +79,10 @@ uint16_t tsTelemPort = 80;
// schemaless
char tsSmlTagName[TSDB_COL_NAME_LEN] = "_tag_null";
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
//If set to empty system will generate table name using MD5 hash.
bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol)
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table name can be specified in tag value.
// If set to empty system will generate table name using MD5 hash.
bool tsSmlDataFormat =
true; // true means that the name and order of cols in each line are the same(only for influx protocol)
// query
int32_t tsQueryPolicy = 1;
......@@ -302,6 +303,7 @@ static int32_t taosAddServerLogCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "qDebugFlag", qDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "wDebugFlag", wDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "sDebugFlag", sDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "idxDebugFlag", idxDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tsdbDebugFlag", tsdbDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "tqDebugFlag", tqDebugFlag, 0, 255, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "fsDebugFlag", fsDebugFlag, 0, 255, 0) != 0) return -1;
......@@ -479,6 +481,7 @@ static void taosSetClientLogCfg(SConfig *pCfg) {
rpcDebugFlag = cfgGetItem(pCfg, "rpcDebugFlag")->i32;
tmrDebugFlag = cfgGetItem(pCfg, "tmrDebugFlag")->i32;
jniDebugFlag = cfgGetItem(pCfg, "jniDebugFlag")->i32;
idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32;
}
static void taosSetServerLogCfg(SConfig *pCfg) {
......@@ -493,6 +496,7 @@ static void taosSetServerLogCfg(SConfig *pCfg) {
fsDebugFlag = cfgGetItem(pCfg, "fsDebugFlag")->i32;
fnDebugFlag = cfgGetItem(pCfg, "fnDebugFlag")->i32;
smaDebugFlag = cfgGetItem(pCfg, "smaDebugFlag")->i32;
idxDebugFlag = cfgGetItem(pCfg, "idxDebugFlag")->i32;
}
static int32_t taosSetClientCfg(SConfig *pCfg) {
......
......@@ -38,7 +38,7 @@ typedef struct IndexCache {
MemTable *mem, *imm;
SIndex* index;
char* colName;
int32_t version;
int64_t version;
int64_t occupiedMem;
int8_t type;
uint64_t suid;
......@@ -47,12 +47,12 @@ typedef struct IndexCache {
TdThreadCond finished;
} IndexCache;
#define CACHE_VERSION(cache) atomic_load_32(&cache->version)
#define CACHE_VERSION(cache) atomic_load_64(&cache->version)
typedef struct CacheTerm {
// key
char* colVal;
int32_t version;
int64_t version;
// value
uint64_t uid;
int8_t colType;
......
......@@ -34,6 +34,15 @@
extern "C" {
#endif
// clang-format off
#define indexFatal(...) do { if (idxDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
#define indexError(...) do { if (idxDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
#define indexWarn(...) do { if (idxDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
#define indexInfo(...) do { if (idxDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
#define indexDebug(...) do { if (idxDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0)
#define indexTrace(...) do { if (idxDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on
typedef enum { LT, LE, GT, GE } RangeType;
typedef enum { kTypeValue, kTypeDeletion } STermValueType;
......@@ -134,15 +143,6 @@ int32_t indexSerialCacheKey(ICacheKey* key, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
// int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// clang-format off
#define indexFatal(...) do { if (sDebugFlag & DEBUG_FATAL) { taosPrintLog("INDEX FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while (0)
#define indexError(...) do { if (sDebugFlag & DEBUG_ERROR) { taosPrintLog("INDEX ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while (0)
#define indexWarn(...) do { if (sDebugFlag & DEBUG_WARN) { taosPrintLog("INDEX WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while (0)
#define indexInfo(...) do { if (sDebugFlag & DEBUG_INFO) { taosPrintLog("INDEX ", DEBUG_INFO, 255, __VA_ARGS__); } } while (0)
#define indexDebug(...) do { if (sDebugFlag & DEBUG_DEBUG) { taosPrintLog("INDEX ", DEBUG_DEBUG, sDebugFlag, __VA_ARGS__);} } while (0)
#define indexTrace(...) do { if (sDebugFlag & DEBUG_TRACE) { taosPrintLog("INDEX ", DEBUG_TRACE, sDebugFlag, __VA_ARGS__);} } while (0)
// clang-format on
#define INDEX_TYPE_CONTAIN_EXTERN_TYPE(ty, exTy) (((ty >> 4) & (exTy)) != 0)
#define INDEX_TYPE_GET_TYPE(ty) (ty & 0x0F)
......
......@@ -28,12 +28,12 @@ extern "C" {
// tfile header content
// |<---suid--->|<---version--->|<-------colName------>|<---type-->|<--fstOffset->|
// |<-uint64_t->|<---int32_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->|
// |<-uint64_t->|<---int64_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->|
#pragma pack(push, 1)
typedef struct TFileHeader {
uint64_t suid;
int32_t version;
int64_t version;
char colName[TSDB_COL_NAME_LEN]; //
uint8_t colType;
int32_t fstOffset;
......@@ -102,14 +102,14 @@ void tfileCachePut(TFileCache* tcache, ICacheKey* key, TFileReader* read
TFileReader* tfileGetReaderByCol(IndexTFile* tf, uint64_t suid, char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName);
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName);
TFileReader* tfileReaderCreate(WriterCtx* ctx);
void tfileReaderDestroy(TFileReader* reader);
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResult* tr);
void tfileReaderRef(TFileReader* reader);
void tfileReaderUnRef(TFileReader* reader);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t type);
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t type);
void tfileWriterClose(TFileWriter* tw);
TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header);
void tfileWriterDestroy(TFileWriter* tw);
......
......@@ -567,7 +567,7 @@ static int64_t indexGetAvaialbleVer(SIndex* sIdx, IndexCache* cache) {
if (rd != NULL) {
ver += MAX(ver, rd->header.version) + 1;
indexInfo("header: %d, ver: %" PRId64 "", rd->header.version, ver);
indexInfo("header: %" PRId64 ", ver: %" PRId64 "", rd->header.version, ver);
}
return ver;
}
......
......@@ -80,7 +80,7 @@ static int32_t cacheSearchTerm(void* cache, SIndexTerm* term, SIdxTempResult* tr
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
pCt->version = atomic_load_64(&pCache->version);
char* key = indexCacheTermGet(pCt);
......@@ -133,7 +133,7 @@ static int32_t cacheSearchCompareFunc(void* cache, SIndexTerm* term, SIdxTempRes
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
pCt->version = atomic_load_64(&pCache->version);
char* key = indexCacheTermGet(pCt);
......@@ -185,7 +185,7 @@ static int32_t cacheSearchTerm_JSON(void* cache, SIndexTerm* term, SIdxTempResul
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
pCt->version = atomic_load_64(&pCache->version);
char* exBuf = NULL;
if (INDEX_TYPE_CONTAIN_EXTERN_TYPE(term->colType, TSDB_DATA_TYPE_JSON)) {
......@@ -259,7 +259,7 @@ static int32_t cacheSearchCompareFunc_JSON(void* cache, SIndexTerm* term, SIdxTe
CacheTerm* pCt = taosMemoryCalloc(1, sizeof(CacheTerm));
pCt->colVal = term->colVal;
pCt->version = atomic_load_32(&pCache->version);
pCt->version = atomic_load_64(&pCache->version);
int8_t dType = INDEX_TYPE_GET_TYPE(term->colType);
int skip = 0;
......@@ -356,7 +356,7 @@ void indexCacheDebug(IndexCache* cache) {
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) {
// TODO, add more debug info
indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version);
}
}
tSkipListDestroyIter(iter);
......@@ -377,7 +377,7 @@ void indexCacheDebug(IndexCache* cache) {
CacheTerm* ct = (CacheTerm*)SL_GET_NODE_DATA(node);
if (ct != NULL) {
// TODO, add more debug info
indexInfo("{colVal: %s, version: %d} \t", ct->colVal, ct->version);
indexInfo("{colVal: %s, version: %" PRId64 "} \t", ct->colVal, ct->version);
}
}
tSkipListDestroyIter(iter);
......@@ -529,7 +529,7 @@ int indexCachePut(void* cache, SIndexTerm* term, uint64_t uid) {
ct->colVal = (char*)taosMemoryCalloc(1, sizeof(char) * (term->nColVal + 1));
memcpy(ct->colVal, term->colVal, term->nColVal);
}
ct->version = atomic_add_fetch_32(&pCache->version, 1);
ct->version = atomic_add_fetch_64(&pCache->version, 1);
// set value
ct->uid = uid;
ct->operaType = term->operType;
......@@ -663,7 +663,11 @@ static int32_t indexCacheTermCompare(const void* l, const void* r) {
// compare colVal
int32_t cmp = strcmp(lt->colVal, rt->colVal);
if (cmp == 0) {
return rt->version - lt->version;
if (rt->version == lt->version) {
cmp = 0;
} else {
cmp = rt->version < lt->version ? -1 : 1;
}
}
return cmp;
}
......
......@@ -54,9 +54,9 @@ static SArray* tfileGetFileList(const char* path);
static int tfileRmExpireFile(SArray* result);
static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version);
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version);
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version);
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version);
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version);
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version);
/*
* search from tfile
*/
......@@ -509,7 +509,7 @@ int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SIdxTempResul
return ret;
}
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const char* colName, uint8_t colType) {
TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int64_t version, const char* colName, uint8_t colType) {
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
// indexInfo("open write file name %s", fullname);
......@@ -526,7 +526,7 @@ TFileWriter* tfileWriterOpen(char* path, uint64_t suid, int32_t version, const c
return tfileWriterCreate(wcx, &tfh);
}
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int32_t version, const char* colName) {
TFileReader* tfileReaderOpen(char* path, uint64_t suid, int64_t version, const char* colName) {
char fullname[256] = {0};
tfileGenFileFullName(fullname, path, suid, colName, version);
......@@ -1019,7 +1019,7 @@ void tfileReaderUnRef(TFileReader* reader) {
static SArray* tfileGetFileList(const char* path) {
char buf[128] = {0};
uint64_t suid;
uint32_t version;
int64_t version;
SArray* files = taosArrayInit(4, sizeof(void*));
TdDirPtr pDir = taosOpenDir(path);
......@@ -1059,19 +1059,19 @@ static int tfileCompare(const void* a, const void* b) {
return strcmp(as, bs);
}
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%d.tindex", suid, col, version)) {
static int tfileParseFileName(const char* filename, uint64_t* suid, char* col, int64_t* version) {
if (3 == sscanf(filename, "%" PRIu64 "-%[^-]-%" PRId64 ".tindex", suid, col, version)) {
// read suid & colid & version success
return 0;
}
return -1;
}
// tfile name suid-colId-version.tindex
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int version) {
sprintf(filename, "%" PRIu64 "-%s-%d.tindex", suid, col, version);
static void tfileGenFileName(char* filename, uint64_t suid, const char* col, int64_t version) {
sprintf(filename, "%" PRIu64 "-%s-%" PRId64 ".tindex", suid, col, version);
return;
}
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int32_t version) {
static void tfileGenFileFullName(char* fullname, const char* path, uint64_t suid, const char* col, int64_t version) {
char filename[128] = {0};
tfileGenFileName(filename, suid, col, version);
sprintf(fullname, "%s/%s", path, filename);
......
......@@ -279,7 +279,7 @@ static void initLog() {
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
sDebugFlag = 143;
idxDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
......@@ -387,7 +387,7 @@ class TFileObj {
std::string path(path_);
int colId = 2;
char buf[64] = {0};
sprintf(buf, "%" PRIu64 "-%d-%d.tindex", header.suid, colId_, header.version);
sprintf(buf, "%" PRIu64 "-%d-%" PRId64 ".tindex", header.suid, colId_, header.version);
path.append("/").append(buf);
fileName_ = path;
......
......@@ -24,11 +24,7 @@
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "executor.h"
#include "executorimpl.h"
#include "indexoperator.h"
#include "os.h"
#include "index.h"
#include "stub.h"
#include "taos.h"
#include "tcompare.h"
......
......@@ -24,7 +24,7 @@ static void initLog() {
const int32_t maxLogFileNum = 10;
tsAsyncLog = 0;
sDebugFlag = 143;
idxDebugFlag = 143;
strcpy(tsLogDir, logDir.c_str());
taosRemoveDir(tsLogDir);
taosMkDir(tsLogDir);
......
......@@ -27,6 +27,13 @@ void (*taosUnRefHandle[])(void* handle) = {transUnrefSrvHandle, transUnrefCliHan
void (*transReleaseHandle[])(void* handle) = {transReleaseSrvHandle, transReleaseCliHandle};
static int32_t transValidLocalFqdn(const char* localFqdn, uint32_t* ip) {
*ip = taosGetIpv4FromFqdn(localFqdn);
if (*ip == 0xFFFFFFF) {
terrno = TSDB_CODE_RPC_FQDN_ERROR;
}
return terrno;
}
void* rpcOpen(const SRpcInit* pInit) {
SRpcInfo* pRpc = taosMemoryCalloc(1, sizeof(SRpcInfo));
if (pRpc == NULL) {
......@@ -35,7 +42,6 @@ void* rpcOpen(const SRpcInit* pInit) {
if (pInit->label) {
tstrncpy(pRpc->label, pInit->label, strlen(pInit->label) + 1);
}
// register callback handle
pRpc->cfp = pInit->cfp;
pRpc->retry = pInit->rfp;
......@@ -48,10 +54,8 @@ void* rpcOpen(const SRpcInit* pInit) {
uint32_t ip = 0;
if (pInit->connType == TAOS_CONN_SERVER) {
ip = taosGetIpv4FromFqdn(pInit->localFqdn);
if (ip == 0xFFFFFFFF) {
tError("invalid fqdn: %s", pInit->localFqdn);
terrno = TSDB_CODE_RPC_FQDN_ERROR;
if (transValidLocalFqdn(pInit->localFqdn, &ip) != 0) {
tError("invalid fqdn: %s, errmsg: %s", pInit->localFqdn, terrstr());
taosMemoryFree(pRpc);
return NULL;
}
......
......@@ -923,7 +923,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
}
if (false == taosValidIpAndPort(srv->ip, srv->port)) {
terrno = TAOS_SYSTEM_ERROR(errno);
tError("invalid ip/port, reason: %s", terrstr());
tError("invalid ip/port, %d:%d, reason: %s", srv->ip, srv->port, terrstr());
goto End;
}
if (false == addHandleToAcceptloop(srv)) {
......
......@@ -39,7 +39,7 @@
#define LOG_BUF_MUTEX(x) ((x)->buffMutex)
typedef struct {
char *buffer;
char * buffer;
int32_t buffStart;
int32_t buffEnd;
int32_t buffSize;
......@@ -58,7 +58,7 @@ typedef struct {
int32_t openInProgress;
pid_t pid;
char logName[LOG_FILE_NAME_LEN];
SLogBuff *logHandle;
SLogBuff * logHandle;
TdThreadMutex logMutex;
} SLogObj;
......@@ -96,6 +96,7 @@ int32_t fsDebugFlag = 135;
int32_t metaDebugFlag = 135;
int32_t fnDebugFlag = 135;
int32_t smaDebugFlag = 135;
int32_t idxDebugFlag = 135;
int64_t dbgEmptyW = 0;
int64_t dbgWN = 0;
......@@ -103,7 +104,7 @@ int64_t dbgSmallWN = 0;
int64_t dbgBigWN = 0;
int64_t dbgWSize = 0;
static void *taosAsyncOutputLog(void *param);
static void * taosAsyncOutputLog(void *param);
static int32_t taosPushLogBuffer(SLogBuff *pLogBuf, const char *msg, int32_t msgLen);
static SLogBuff *taosLogBuffNew(int32_t bufSize);
static void taosCloseLogByFd(TdFilePtr pFile);
......@@ -701,7 +702,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
int32_t compressSize = 163840;
int32_t ret = 0;
int32_t len = 0;
char *data = taosMemoryMalloc(compressSize);
char * data = taosMemoryMalloc(compressSize);
// gzFile dstFp = NULL;
// srcFp = fopen(srcFileName, "r");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册