提交 0cd932e8 编写于 作者: dengyihao's avatar dengyihao

refactor tindex write

上级 ebcb9be3
......@@ -27,18 +27,22 @@ extern "C" {
#endif
// tfile header content
// |<---suid--->|<---version--->|<--colLen-->|<-colName->|<---type-->|
// |<-uint64_t->|<---int32_t--->|<--int32_t->|<-colLen-->|<-uint8_t->|
// |<---suid--->|<---version--->|<-------colName------>|<---type-->|<--fstOffset->|
// |<-uint64_t->|<---int32_t--->|<--TSDB_COL_NAME_LEN-->|<-uint8_t->|<---int32_t-->|
#pragma pack(push, 1)
typedef struct TFileHeader {
uint64_t suid;
int32_t version;
char colName[128]; //
char colName[TSDB_COL_NAME_LEN]; //
uint8_t colType;
int32_t fstOffset;
} TFileHeader;
#pragma pack(pop)
#define TFILE_HEADER_SIZE (sizeof(TFileHeader) + sizeof(uint32_t))
#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
#define TFILE_HEADER_SIZE (sizeof(TFileHeader))
#define TFILE_HEADER_NO_FST (TFILE_HEADER_SIZE - sizeof(int32_t))
//#define TFILE_HADER_PRE_SIZE (sizeof(uint64_t) + sizeof(int32_t) + sizeof(int32_t))
typedef struct TFileCacheKey {
uint64_t suid;
......
......@@ -48,21 +48,23 @@ static void tfileSerialTableIdsToBuf(char* buf, SArray* tableIds) {
SERIALIZE_VAR_TO_BUF(buf, *v, uint64_t);
}
}
static FORCE_INLINE int tfileWriteFstOffset(TFileWriter* tw, int32_t offset) {
int32_t fstOffset = offset + sizeof(tw->header.fstOffset);
tw->header.fstOffset = fstOffset;
if (sizeof(fstOffset) != tw->ctx->write(tw->ctx, (char*)&fstOffset, sizeof(fstOffset))) { return -1; }
return 0;
}
static FORCE_INLINE int tfileWriteHeader(TFileWriter* writer) {
char buf[TFILE_HEADER_SIZE] = {0};
char buf[TFILE_HEADER_NO_FST] = {0};
char* p = buf;
TFileHeader* header = &writer->header;
SERIALIZE_MEM_TO_BUF(p, header, suid);
SERIALIZE_MEM_TO_BUF(p, header, version);
SERIALIZE_VAR_TO_BUF(p, strlen(header->colName), int32_t);
SERIALIZE_STR_MEM_TO_BUF(p, header, colName, strlen(header->colName));
SERIALIZE_MEM_TO_BUF(p, header, colType);
int offset = p - buf;
int nwrite = writer->ctx->write(writer->ctx, buf, offset);
if (offset != nwrite) { return -1; }
writer->offset = offset;
memcpy(buf, (char*)header, sizeof(buf));
int nwrite = writer->ctx->write(writer->ctx, buf, sizeof(buf));
if (sizeof(buf) != nwrite) { return -1; }
writer->offset = nwrite;
return 0;
}
static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
......@@ -82,26 +84,12 @@ static int tfileWriteData(TFileWriter* write, TFileValue* tval) {
}
static FORCE_INLINE int tfileReadLoadHeader(TFileReader* reader) {
// TODO simple tfile header later
char buf[TFILE_HADER_PRE_SIZE];
char buf[TFILE_HEADER_SIZE];
char* p = buf;
int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HADER_PRE_SIZE);
assert(nread == TFILE_HADER_PRE_SIZE);
TFileHeader* header = &reader->header;
memcpy(&header->suid, p, sizeof(header->suid));
p += sizeof(header->suid);
memcpy(&header->version, p, sizeof(header->version));
p += sizeof(header->version);
int32_t colLen = 0;
memcpy(&colLen, p, sizeof(colLen));
assert(colLen < sizeof(header->colName));
nread = reader->ctx->read(reader->ctx, header->colName, colLen);
assert(nread == colLen);
nread = reader->ctx->read(reader->ctx, &header->colType, sizeof(header->colType));
int64_t nread = reader->ctx->read(reader->ctx, buf, TFILE_HEADER_SIZE);
assert(nread == TFILE_HEADER_SIZE);
memcpy(&reader->header, buf, sizeof(buf));
return 0;
}
......@@ -285,7 +273,7 @@ TFileWriter* tfileWriterCreate(WriterCtx* ctx, TFileHeader* header) {
return tw;
}
int TFileWriterPut(TFileWriter* tw, void* data) {
int tfileWriterPut(TFileWriter* tw, void* data) {
// sort by coltype and write to tindex
__compar_fn_t fn = getComparFunc(tw->header.colType, 0);
taosArraySortPWithExt((SArray*)(data), tfileValueCompare, &fn);
......@@ -294,6 +282,21 @@ int TFileWriterPut(TFileWriter* tw, void* data) {
char* buf = calloc(1, sizeof(bufLimit));
char* p = buf;
int32_t sz = taosArrayGetSize((SArray*)data);
int32_t fstOffset = tw->offset;
// ugly code, refactor later
for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i);
int32_t tbsz = taosArrayGetSize(v->tableId);
int32_t ttsz = TF_TABLE_TATOAL_SIZE(tbsz);
fstOffset += ttsz;
}
// check result or not
tfileWriteFstOffset(tw, fstOffset);
// tw->ctx->header.fstOffset = fstOffset;
// tw->ctx->write(tw->ctx, &fstOffset, sizeof(fstOffset));
for (size_t i = 0; i < sz; i++) {
TFileValue* v = taosArrayGetP((SArray*)data, i);
......@@ -311,7 +314,7 @@ int TFileWriterPut(TFileWriter* tw, void* data) {
tfileSerialTableIdsToBuf(p, v->tableId);
offset += ttsz;
p = buf + offset;
// set up value offset and
// set up value offset
v->offset = tw->offset;
tw->offset += ttsz;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册