提交 a655dd2e 编写于 作者: dengyihao's avatar dengyihao

update tindex write

上级 7d8e5c3a
...@@ -31,6 +31,7 @@ typedef struct WriterCtx { ...@@ -31,6 +31,7 @@ typedef struct WriterCtx {
int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*write)(struct WriterCtx* ctx, uint8_t* buf, int len);
int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len); int (*read)(struct WriterCtx* ctx, uint8_t* buf, int len);
int (*flush)(struct WriterCtx* ctx); int (*flush)(struct WriterCtx* ctx);
int (*readFrom)(struct WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
WriterType type; WriterType type;
union { union {
struct { struct {
...@@ -48,6 +49,7 @@ typedef struct WriterCtx { ...@@ -48,6 +49,7 @@ typedef struct WriterCtx {
static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len); static int writeCtxDoWrite(WriterCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len); static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len);
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset);
static int writeCtxDoFlush(WriterCtx* ctx); static int writeCtxDoFlush(WriterCtx* ctx);
WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity); WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int32_t capacity);
......
...@@ -39,6 +39,17 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) { ...@@ -39,6 +39,17 @@ static int writeCtxDoRead(WriterCtx* ctx, uint8_t* buf, int len) {
return nRead; return nRead;
} }
static int writeCtxDoReadFrom(WriterCtx* ctx, uint8_t* buf, int len, int32_t offset) {
int nRead = 0;
if (ctx->type == TFile) {
tfLseek(ctx->file.fd, offset, 0);
nRead = tfRead(ctx->file.fd, buf, len);
} else {
// refactor later
assert(0);
}
return nRead;
}
static int writeCtxDoFlush(WriterCtx* ctx) { static int writeCtxDoFlush(WriterCtx* ctx) {
if (ctx->type == TFile) { if (ctx->type == TFile) {
// tfFsync(ctx->fd); // tfFsync(ctx->fd);
...@@ -73,6 +84,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int ...@@ -73,6 +84,7 @@ WriterCtx* writerCtxCreate(WriterType type, const char* path, bool readOnly, int
ctx->write = writeCtxDoWrite; ctx->write = writeCtxDoWrite;
ctx->read = writeCtxDoRead; ctx->read = writeCtxDoRead;
ctx->flush = writeCtxDoFlush; ctx->flush = writeCtxDoFlush;
ctx->readFrom = writeCtxDoReadFrom;
ctx->offset = 0; ctx->offset = 0;
ctx->limit = capacity; ctx->limit = capacity;
......
...@@ -34,18 +34,18 @@ typedef struct TFileValue { ...@@ -34,18 +34,18 @@ typedef struct TFileValue {
static int tfileValueCompare(const void* a, const void* b, const void* param); static int tfileValueCompare(const void* a, const void* b, const void* param);
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds); static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds);
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
static int tfileWriteHeader(TFileWriter* writer); static int tfileWriteHeader(TFileWriter* writer);
static int tfileWriteFstOffset(TFileWriter* tw, int32_t offset);
static int tfileWriteData(TFileWriter* write, TFileValue* tval); static int tfileWriteData(TFileWriter* write, TFileValue* tval);
static int tfileReadLoadHeader(TFileReader* reader); static int tfileReadLoadHeader(TFileReader* reader);
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result);
static int tfileGetFileList(const char* path, SArray* result); static int tfileGetFileList(const char* path, SArray* result);
static void tfileDestroyFileName(void* elem); static void tfileDestroyFileName(void* elem);
static int tfileCompare(const void* a, const void* b); static int tfileCompare(const void* a, const void* b);
static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version); static int tfileParseFileName(const char* filename, uint64_t* suid, int* colId, int* version);
static void tfileSerialCacheKey(TFileCacheKey* key, char* buf); static void tfileSerialCacheKey(TFileCacheKey* key, char* buf);
// static tfileGetCompareFunc(uint8_t byte) {}
TFileCache* tfileCacheCreate(const char* path) { TFileCache* tfileCacheCreate(const char* path) {
TFileCache* tcache = calloc(1, sizeof(TFileCache)); TFileCache* tcache = calloc(1, sizeof(TFileCache));
...@@ -142,19 +142,22 @@ void tfileReaderDestroy(TFileReader* reader) { ...@@ -142,19 +142,22 @@ void tfileReaderDestroy(TFileReader* reader) {
int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) { int tfileReaderSearch(TFileReader* reader, SIndexTermQuery* query, SArray* result) {
SIndexTerm* term = query->term; SIndexTerm* term = query->term;
// refactor to callback later // refactor to callback later
if (query->qType == QUERY_TERM) { if (query->qType == QUERY_TERM) {
uint64_t offset; uint64_t offset;
FstSlice key = fstSliceCreate(term->colVal, term->nColVal); FstSlice key = fstSliceCreate(term->colVal, term->nColVal);
if (fstGet(reader->fst, &key, &offset)) { if (fstGet(reader->fst, &key, &offset)) {
// return tfileReadLoadTableIds(reader, offset, result);
} else { } else {
indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal); indexInfo("index: %" PRIu64 ", col: %s, colVal: %s, not found in tindex", term->suid, term->colName, term->colVal);
} }
return 0; return 0;
} else if (query->qType == QUERY_PREFIX) { } else if (query->qType == QUERY_PREFIX) {
// handle later
// //
// } else {
// handle later
} }
return 0; return 0;
} }
...@@ -198,13 +201,10 @@ int tfileWriterPut(TFileWriter* tw, void* data) { ...@@ -198,13 +201,10 @@ int tfileWriterPut(TFileWriter* tw, void* data) {
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
int32_t tbsz = taosArrayGetSize(v->tableId); int32_t tbsz = taosArrayGetSize(v->tableId);
int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz); fstOffset += TF_TABLE_TATOAL_SIZE(tbsz);
fstOffset += ttsz;
} }
// check result or not // check result or not
tfileWriteFstOffset(tw, fstOffset); tfileWriteFstOffset(tw, fstOffset);
// tw->ctx->header.fstOffset = fstOffset;
// tw->ctx->write(tw->ctx, &fstOffset, sizeof(fstOffset));
for (size_t i = 0; i < sz; i++) { for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i); TFileValue* v = taosArrayGetP((SArray*)data, i);
...@@ -287,11 +287,11 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) { ...@@ -287,11 +287,11 @@ static int tfileValueCompare(const void* a, const void* b, const void* param) {
return fn(av->colVal, bv->colVal); return fn(av->colVal, bv->colVal);
} }
static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) { static void tfileSerialTableIdsToBuf(char* buf, SArray* ids) {
int tbSz = taosArrayGetSize(tableIds); int sz = taosArrayGetSize(ids);
SERIALIZE_VAR_TO_BUF(buf, tbSz, int32_t); SERIALIZE_VAR_TO_BUF(buf, sz, int32_t);
for (size_t i = 0; i < tbSz; i++) { for (size_t i = 0; i < sz; i++) {
uint64_t* v = taosArrayGet(tableIds, i); uint64_t* v = taosArrayGet(ids, i);
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t); SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
} }
} }
...@@ -328,6 +328,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) { ...@@ -328,6 +328,7 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
} else { } else {
// handle other type later // handle other type later
} }
return 0;
} }
static int tfileReadLoadHeader(TFileReader* reader) { static int tfileReadLoadHeader(TFileReader* reader) {
// TODO simple tfile header later // TODO simple tfile header later
...@@ -339,6 +340,23 @@ static int tfileReadLoadHeader(TFileReader* reader) { ...@@ -339,6 +340,23 @@ static int tfileReadLoadHeader(TFileReader* reader) {
memcpy(&reader->header, buf, sizeof(buf)); memcpy(&reader->header, buf, sizeof(buf));
return 0; return 0;
} }
static int tfileReadLoadTableIds(TFileReader* reader, int32_t offset, SArray* result) {
int32_t nid;
WriterCtx* ctx = reader->ctx;
int32_t nread = ctx->readFrom(ctx, (char*)&nid, sizeof(nid), offset);
assert(sizeof(nid) == nread);
char* buf = calloc(1, sizeof(uint64_t) * nid);
if (buf == NULL) { return -1; }
nread = ctx->read(ctx, buf, sizeof(uint64_t) * nid);
uint64_t* ids = (uint64_t*)buf;
for (int32_t i = 0; i < nid; i++) {
taosArrayPush(result, ids + i);
}
free(buf);
return 0;
}
static int tfileGetFileList(const char* path, SArray* result) { static int tfileGetFileList(const char* path, SArray* result) {
DIR* dir = opendir(path); DIR* dir = opendir(path);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册