提交 93638f09 编写于 作者: L Larry Hamel 提交者: C.J. Jameson

gpperfmon: remove filerep stat collection

Signed-off-by: NMarbin Tan <mtan@pivotal.io>
上级 e5f54069
......@@ -61,16 +61,6 @@ typedef struct qdnode_t {
apr_hash_t* query_seginfo_hash;
} qdnode_t;
typedef struct gpmon_filrep_holder_t
{
gpmmon_filerep_key_t key;
double elapsedTime_secs_primary;
double elapsedTime_secs_mirror;
gpmon_filerep_primarystats_s primary;
gpmon_filerep_mirrorstats_s mirror;
} gpmon_filrep_holder_t;
struct agg_t
{
apr_int64_t generation;
......@@ -79,7 +69,6 @@ struct agg_t
apr_hash_t* qtab; /* key = gpmon_qlog_key_t, value = qdnode ptr. */
apr_hash_t* htab; /* key = hostname, value = gpmon_metrics_t ptr */
apr_hash_t* stab; /* key = databaseid, value = gpmon_seginfo_t ptr */
apr_hash_t* ftab; /* key = gpmmon_filerep_key_t, value = gpmon_filrep_holder_t ptr */
apr_hash_t* fsinfotab; /* This is the persistent fsinfo hash table: key = gpmon_fsinfokey_t, value = mmon_fsinfo_t ptr */
};
......@@ -329,44 +318,6 @@ static apr_status_t agg_put_segment(agg_t* agg, const gpmon_seginfo_t* seg)
return 0;
}
static apr_status_t agg_put_filerep(agg_t* agg, const gpmon_filerepinfo_t* filerep)
{
gpmon_filrep_holder_t* rec;
int addtoHash = 0;
rec = apr_hash_get(agg->ftab, &filerep->key.dkey, sizeof(filerep->key.dkey));
if (!rec)
{
// use pcalloc to initialize to 0
rec = apr_pcalloc(agg->pool, sizeof(*rec));
if (!rec)
{
return APR_ENOMEM;
}
memcpy(&rec->key, &filerep->key.dkey, (sizeof(rec->key)));
addtoHash = 1;
}
if (filerep->key.isPrimary)
{
rec->primary = filerep->stats.primary;
rec->elapsedTime_secs_primary = filerep->elapsedTime_secs;
}
else
{
rec->mirror = filerep->stats.mirror;
rec->elapsedTime_secs_mirror = filerep->elapsedTime_secs;
}
if (addtoHash)
{
apr_hash_set(agg->ftab, &rec->key, sizeof(rec->key), rec);
}
return 0;
}
static apr_status_t agg_put_query_metrics(agg_t* agg, const gpmon_qlog_t* qlog, apr_int64_t generation)
{
qdnode_t* node;
......@@ -536,12 +487,6 @@ apr_status_t agg_create(agg_t** retagg, apr_int64_t generation, apr_pool_t* pare
return APR_ENOMEM;
}
agg->ftab = apr_hash_make(pool);
if (!agg->ftab) {
apr_pool_destroy(pool);
return APR_ENOMEM;
}
*retagg = agg;
return 0;
}
......@@ -694,8 +639,6 @@ apr_status_t agg_put(agg_t* agg, const gp_smon_to_mmon_packet_t* pkt)
return agg_put_qexec(agg, &pkt->u.qexec_packet, agg->generation);
if (pkt->header.pkttype == GPMON_PKTTYPE_SEGINFO)
return agg_put_segment(agg, &pkt->u.seginfo);
if (pkt->header.pkttype == GPMON_PKTTYPE_FILEREP)
return agg_put_filerep(agg, &pkt->u.filerepinfo);
if (pkt->header.pkttype == GPMON_PKTTYPE_QUERY_HOST_METRICS)
return agg_put_query_metrics(agg, &pkt->u.qlog, agg->generation);
if (pkt->header.pkttype == GPMON_PKTTYPE_FSINFO)
......@@ -719,7 +662,6 @@ static void delete_old_files(bloom_t* bloom);
static apr_uint32_t write_fsinfo(agg_t* agg, const char* nowstr);
static apr_uint32_t write_system(agg_t* agg, const char* nowstr);
static apr_uint32_t write_segmentinfo(agg_t* agg, char* nowstr);
static apr_uint32_t write_filerepinfo(agg_t* agg, char* nowstr);
static apr_uint32_t write_dbmetrics(dbmetrics_t* dbmetrics, char* nowstr);
static apr_uint32_t write_qlog(FILE* fp, qdnode_t *qdnode, const char* nowstr, apr_uint32_t done);
static apr_uint32_t write_qlog_full(FILE* fp, qdnode_t *qdnode, const char* nowstr);
......@@ -784,10 +726,6 @@ apr_status_t agg_dump(agg_t* agg)
bloom_set(&bloom, GPMON_DIR "segment_tail.dat");
bloom_set(&bloom, GPMON_DIR "segment_stage.dat");
bloom_set(&bloom, GPMON_DIR "_segment_tail.dat");
bloom_set(&bloom, GPMON_DIR "filerep_now.dat");
bloom_set(&bloom, GPMON_DIR "filerep_tail.dat");
bloom_set(&bloom, GPMON_DIR "filerep_stage.dat");
bloom_set(&bloom, GPMON_DIR "_filerep_tail.dat");
bloom_set(&bloom, GPMON_DIR "diskspace_now.dat");
bloom_set(&bloom, GPMON_DIR "diskspace_tail.dat");
bloom_set(&bloom, GPMON_DIR "diskspace_stage.dat");
......@@ -802,10 +740,6 @@ apr_status_t agg_dump(agg_t* agg)
temp_bytes_written = write_segmentinfo(agg, nowstr);
incremement_tail_bytes(temp_bytes_written);
/* write filerep metrics */
temp_bytes_written = write_filerepinfo(agg, nowstr);
incremement_tail_bytes(temp_bytes_written);
/* write fsinfo metrics */
temp_bytes_written = write_fsinfo(agg, nowstr);
incremement_tail_bytes(temp_bytes_written);
......@@ -939,7 +873,6 @@ apr_status_t agg_dump(agg_t* agg)
rename(GPMON_DIR "_iterators_now.dat", GPMON_DIR "iterators_now.dat");
rename(GPMON_DIR "_queries_now.dat", GPMON_DIR "queries_now.dat");
rename(GPMON_DIR "_database_now.dat", GPMON_DIR "database_now.dat");
rename(GPMON_DIR "_filerep_now.dat", GPMON_DIR "filerep_now.dat");
rename(GPMON_DIR "_diskspace_now.dat", GPMON_DIR "diskspace_now.dat");
/* clean up ... delete all old files by checking our bloom filter */
......@@ -1034,95 +967,6 @@ static void delete_old_files(bloom_t* bloom)
}
}
static apr_uint32_t write_filerepinfo(agg_t* agg, char* nowstr)
{
const int million = 1000000;
apr_uint32_t bytes_written = 0;
FILE* fp = fopen(GPMON_DIR "filerep_tail.dat", "a");
FILE* fp2 = fopen(GPMON_DIR "_filerep_now.dat", "w");
apr_hash_index_t* hi;
const int line_size = 2048;
char line[line_size];
if (!fp || !fp2)
{
if (fp) fclose(fp);
if (fp2) fclose(fp2);
return 0;
}
for (hi = apr_hash_first(0, agg->ftab); hi; hi = apr_hash_next(hi))
{
gpmon_filrep_holder_t* sp;
int bytes_this_record;
void* valptr = 0;
apr_hash_this(hi, 0, 0, (void**) &valptr);
sp = (gpmon_filrep_holder_t*) valptr;
snprintf(line, line_size, "%s|%u|%u|%s|%d|%s|%d|" // through mirror_port
"%d|%d|%d|%d|%.2f|" // primary write syscall
"%d|%d|%.2f|" // primary fsync syscall
"%d|%d|%d|%d|%.2f|" // primary write shmem
"%d|%d|%.2f|" // primary fsync shmem
"%d|%d|%.2f|" // primary fsync roundtrip
"%d|%d|%.2f|" // primary test roundtrip
"%d|%d|%d|%d|%.2f|" // mirror write syscall
"%d|%d|%.2f", // mirror fsync syscall
nowstr,
(apr_uint32_t)(sp->elapsedTime_secs_primary * million),
(apr_uint32_t)(sp->elapsedTime_secs_primary * million),
sp->key.primary_hostname,
sp->key.primary_port,
sp->key.mirror_hostname,
sp->key.mirror_port,
sp->primary.write_syscall_size_avg,
sp->primary.write_syscall_size_max,
sp->primary.write_syscall_time_avg,
sp->primary.write_syscall_time_max,
((sp->elapsedTime_secs_primary >0)?(sp->primary.write_syscall_count / sp->elapsedTime_secs_primary):0),
sp->primary.fsync_syscall_time_avg,
sp->primary.fsync_syscall_time_max,
((sp->elapsedTime_secs_primary >0)?(sp->primary.fsync_syscall_count / sp->elapsedTime_secs_primary):0),
sp->primary.write_shmem_size_avg,
sp->primary.write_shmem_size_max,
sp->primary.write_shmem_time_avg,
sp->primary.write_shmem_time_max,
((sp->elapsedTime_secs_primary >0)?(sp->primary.write_shmem_count / sp->elapsedTime_secs_primary):0),
sp->primary.fsync_shmem_time_avg,
sp->primary.fsync_shmem_time_max,
((sp->elapsedTime_secs_primary >0)?(sp->primary.fsync_shmem_count / sp->elapsedTime_secs_primary):0),
sp->primary.roundtrip_fsync_msg_time_avg,
sp->primary.roundtrip_fsync_msg_time_max,
((sp->elapsedTime_secs_primary >0)?(sp->primary.roundtrip_fsync_msg_count / sp->elapsedTime_secs_primary):0),
sp->primary.roundtrip_test_msg_time_avg,
sp->primary.roundtrip_test_msg_time_max,
((sp->elapsedTime_secs_primary >0)?(sp->primary.roundtrip_test_msg_count / sp->elapsedTime_secs_primary):0),
sp->mirror.write_syscall_size_avg,
sp->mirror.write_syscall_size_max,
sp->mirror.write_syscall_time_avg,
sp->mirror.write_syscall_time_max,
((sp->elapsedTime_secs_mirror >0)?(sp->mirror.write_syscall_count / sp->elapsedTime_secs_mirror):0),
sp->mirror.fsync_syscall_time_avg,
sp->mirror.fsync_syscall_time_max,
((sp->elapsedTime_secs_mirror >0)?(sp->mirror.fsync_syscall_count / sp->elapsedTime_secs_mirror):0));
bytes_this_record = strlen(line) + 1;
if (bytes_this_record == line_size)
{
gpmon_warning(FLINE, "filerep stats line to too long ... ignored: %s", line);
continue;
}
fprintf(fp, "%s\n", line);
fprintf(fp2, "%s\n", line);
bytes_written += bytes_this_record;
}
fclose(fp);
fclose(fp2);
return bytes_written;
}
static apr_uint32_t write_segmentinfo(agg_t* agg, char* nowstr)
{
FILE* fp = fopen(GPMON_DIR "segment_tail.dat", "a");
......
......@@ -1067,7 +1067,7 @@ apr_status_t call_for_each_table(eachtablefunc, apr_pool_t*, PGconn*);
apr_status_t call_for_each_table_with_opt(eachtablefuncwithopt, apr_pool_t*, PGconn*, mmon_options_t*);
char* all_tables[] = { "system", "queries", "iterators", "database", "segment", "filerep", "diskspace" };
char* all_tables[] = { "system", "queries", "iterators", "database", "segment", "diskspace" };
apr_status_t call_for_each_table(eachtablefunc func, apr_pool_t* pool, PGconn* conn)
{
......
......@@ -54,8 +54,6 @@ inline size_t get_size_by_pkttype_smon_to_mmon(apr_int16_t pkttype)
return(sizeof(qexec_packet_t));
case GPMON_PKTTYPE_SEGINFO:
return(sizeof(gpmon_seginfo_t));
case GPMON_PKTTYPE_FILEREP:
return(sizeof(gpmon_filerepinfo_t));
case GPMON_PKTTYPE_QUERY_HOST_METRICS:
return(sizeof(gpmon_qlog_t));
case GPMON_PKTTYPE_FSINFO:
......
......@@ -231,7 +231,6 @@ typedef struct gp_smon_to_mmon_packet_t {
gpmon_qlog_t qlog;
qexec_packet_t qexec_packet;
gpmon_seginfo_t seginfo;
gpmon_filerepinfo_t filerepinfo;
gpmon_fsinfo_t fsinfo;
gpmon_query_seginfo_t queryseg;
} u;
......
......@@ -8,59 +8,6 @@
-- Violate the above and you _will_ break upgrade.
--
-- TABLE: filerep_history
-- ctime time of measurement
-- primary_measurement_microsec elapsed seconds for primary measurements
-- mirror_measurement_microsec elapsed seconds for mirror measurements
-- primary_hostname key -- primary segment
-- primary_port key -- primary segment
-- mirror_hostname key -- mirror segment
-- mirror_port key -- mirror segment
-- primary_write_syscall_bytes_avg write system calls on primary
-- primary_write_syscall_byte_max
-- primary_write_syscall_microsecs_avg
-- primary_write_syscall_microsecs_max
-- primary_write_syscall_per_sec
-- primary_fsync_syscall_microsec_avg fysnc system calls on primary
-- primary_fsync_syscall_microsec_max
-- primary_fsync_syscall_per_sec
-- primary_write_shmem_bytes_avg putting write messages into shared memory on primary
-- primary_write_shmem_bytes_max
-- primary_write_shmem_microsec_avg
-- primary_write_shmem_microsec_max
-- primary_write_shmem_per_sec
-- primary_fsync_shmem_microsec_avg putting fsync messages into shared memory on primary
-- primary_fsync_shmem_microsec_max
-- primary_fsync_shmem_per_sec
-- primary_roundtrip_fsync_msg_microsec_avg Roundtrip from sending fsync message to mirror to get ack back on primary
-- primary_roundtrip_fsync_msg_microsec_max
-- primary_roundtrip_fsync_msg_per_sec
-- primary_roundtrip_test_msg_microsec_avg Roundtrip from sending test message to mirror to getting back on primary
-- primary_roundtrip_test_msg_microsec_max
-- primary_roundtrip_test_msg_per_sec
-- mirror_write_syscall_size_avg write system call on mirror
-- mirror_write_syscall_size_max
-- mirror_write_syscall_microsec_avg
-- mirror_write_syscall_microsec_max
-- mirror_write_syscall_per_sec
-- mirror_fsync_syscall_microsec_avg fsync system call on mirror
-- mirror_fsync_syscall_microsec_max
-- mirror_fsync_syscall_per_sec
create table public.filerep_history (ctime timestamp(0) not null, primary_measurement_microsec bigint not null, mirror_measurement_microsec bigint not null, primary_hostname varchar(64) not null, primary_port int not null, mirror_hostname varchar(64) not null, mirror_port int not null, primary_write_syscall_bytes_avg bigint not null, primary_write_syscall_byte_max bigint not null, primary_write_syscall_microsecs_avg bigint not null, primary_write_syscall_microsecs_max bigint not null, primary_write_syscall_per_sec float not null, primary_fsync_syscall_microsec_avg bigint not null, primary_fsync_syscall_microsec_max bigint not null, primary_fsync_syscall_per_sec float not null, primary_write_shmem_bytes_avg bigint not null, primary_write_shmem_bytes_max bigint not null, primary_write_shmem_microsec_avg bigint not null, primary_write_shmem_microsec_max bigint not null, primary_write_shmem_per_sec float not null, primary_fsync_shmem_microsec_avg bigint not null, primary_fsync_shmem_microsec_max bigint not null, primary_fsync_shmem_per_sec float not null, primary_roundtrip_fsync_msg_microsec_avg bigint not null, primary_roundtrip_fsync_msg_microsec_max bigint not null, primary_roundtrip_fsync_msg_per_sec float not null, primary_roundtrip_test_msg_microsec_avg bigint not null, primary_roundtrip_test_msg_microsec_max bigint not null, primary_roundtrip_test_msg_per_sec float not null, mirror_write_syscall_size_avg bigint not null, mirror_write_syscall_size_max bigint not null, mirror_write_syscall_microsec_avg bigint not null, mirror_write_syscall_microsec_max bigint not null, mirror_write_syscall_per_sec float not null, mirror_fsync_syscall_microsec_avg bigint not null, mirror_fsync_syscall_microsec_max bigint not null, mirror_fsync_syscall_per_sec float not null) with (fillfactor=100) distributed by (ctime) partition by range (ctime)(start (date '2010-01-01') end (date '2010-02-01') EVERY (interval '1 month'));
--- TABLE: filerep_now
-- (like filerep_history)
create external web table public.filerep_now (like public.filerep_history) execute 'cat gpperfmon/data/filerep_now.dat 2> /dev/null || true' on master format 'text' (delimiter '|' NULL as 'null');
-- TABLE: diskpace_tail
-- (like filerep_history)
create external web table public.filerep_tail (like public.filerep_history) execute 'cat gpperfmon/data/filerep_tail.dat 2> /dev/null || true' on master format 'text' (delimiter '|' NULL as 'null');
-- TABLE: _filerep_tail
-- (like filerep_history)
create external web table public._filerep_tail (like public.filerep_history) execute 'cat gpperfmon/data/_filerep_tail.dat 2> /dev/null || true' on master format 'text' (delimiter '|' NULL as 'null');
-- TABLE: diskspace_history
-- ctime time of measurement
-- hostname hostname of measurement
......
......@@ -111,7 +111,6 @@ struct gx_t
apr_hash_t* qlogtab; /* stores qlog packets */
apr_hash_t* segmenttab; /* stores segment packets */
apr_hash_t* pidtab; /* key=pid, value=pidrec_t */
apr_hash_t* filereptab; /* stores gpmon_filerepinfo_t packets */
apr_hash_t* querysegtab; /* stores gpmon_query_seginfo_t */
};
......@@ -175,19 +174,6 @@ void update_log_filename()
tm->tm_sec);
}
typedef struct gpsmon_filerepinfo_t
{
// KEY
char primary_hostname[NAMEDATALEN];
apr_uint16_t primary_port;
char mirror_hostname[NAMEDATALEN];
apr_uint16_t mirror_port;
bool isPrimary;
} gpsmon_filerepinfo_t;
static void gx_accept(SOCKET sock, short event, void* arg);
static void gx_recvfrom(SOCKET sock, short event, void* arg);
static apr_uint32_t create_qexec_packet(const gpmon_qexec_t* qexec, gp_smon_to_mmon_packet_t* pkt);
......@@ -211,9 +197,6 @@ static inline void copy_union_packet_gp_smon_to_mmon(gp_smon_to_mmon_packet_t* p
case GPMON_PKTTYPE_SEGINFO:
memcpy(&pkt->u.seginfo, &pkt_src->u.seginfo, sizeof(gpmon_seginfo_t));
break;
case GPMON_PKTTYPE_FILEREP:
memcpy(&pkt->u.filerepinfo, &pkt_src->u.filerepinfo, sizeof(gpmon_filerepinfo_t));
break;
case GPMON_PKTTYPE_QUERY_HOST_METRICS:
memcpy(&pkt->u.qlog, &pkt_src->u.qlog, sizeof(gpmon_qlog_t));
break;
......@@ -1013,7 +996,6 @@ static void gx_gettcpcmd(SOCKET sock, short event, void* arg)
apr_hash_t* qdtab;
apr_hash_t* pidtab;
apr_hash_t* segtab;
apr_hash_t* filereptab;
if (event & EV_TIMEOUT) // didn't get command from gpmmon, quit
{
if(gx.tcp_sock)
......@@ -1040,7 +1022,6 @@ static void gx_gettcpcmd(SOCKET sock, short event, void* arg)
qdtab = gx.qlogtab;
pidtab = gx.pidtab;
segtab = gx.segmenttab;
filereptab = gx.filereptab;
querysegtab = gx.querysegtab;
oldpool = apr_hash_pool_get(qetab);
......@@ -1064,10 +1045,6 @@ static void gx_gettcpcmd(SOCKET sock, short event, void* arg)
gx.segmenttab = apr_hash_make(newpool);
CHECKMEM(gx.segmenttab);
/* filerep hash table */
gx.filereptab = apr_hash_make(newpool);
CHECKMEM(gx.filereptab);
/* queryseg hash table */
gx.querysegtab = apr_hash_make(newpool);
CHECKMEM(gx.querysegtab);
......@@ -1091,18 +1068,6 @@ static void gx_gettcpcmd(SOCKET sock, short event, void* arg)
int count = 0;
apr_hash_t* query_cpu_table = NULL;
for (hi = apr_hash_first(0, filereptab); hi; hi = apr_hash_next(hi))
{
apr_hash_this(hi, 0, 0, &vptr);
ppkt = vptr;
if (ppkt->header.pkttype != GPMON_PKTTYPE_FILEREP)
continue;
TR2(("sending magic %x, pkttype %d\n", ppkt->header.magic, ppkt->header.pkttype));
send_smon_to_mon_pkt(sock, ppkt);
count++;
}
for (hi = apr_hash_first(0, querysegtab); hi; hi = apr_hash_next(hi))
{
apr_hash_this(hi, 0, 0, &vptr);
......@@ -1400,163 +1365,6 @@ static void update_count_value(apr_uint32_t* total, apr_uint32_t newdata)
*total += newdata;
}
static void accumulate_filerep_primary_data(gpmon_filerep_primarystats_s* total, gpmon_filerep_primarystats_s* newdata)
{
// ALWAYS UPDATE AVERAGES BEFORE COUNTS ... AVERAGE UPDATE USES COUNT
// write_syscall_size_avg
update_avg_value(total->write_syscall_count, &total->write_syscall_size_avg,
newdata->write_syscall_count, newdata->write_syscall_size_avg);
// write_syscall_size_max
update_max_value(&total->write_syscall_size_max, newdata->write_syscall_size_max);
// write_syscall_time_avg
update_avg_value(total->write_syscall_count, &total->write_syscall_time_avg,
newdata->write_syscall_count, newdata->write_syscall_time_avg);
// write_syscall_time_max
update_max_value(&total->write_syscall_time_max, newdata->write_syscall_time_max);
// write_syscall_count
update_count_value(&total->write_syscall_count, newdata->write_syscall_count);
// fsync_syscall_time_avg
update_avg_value(total->fsync_syscall_count, &total->fsync_syscall_time_avg,
newdata->fsync_syscall_count, newdata->fsync_syscall_time_avg);
// fsync_syscall_time_max
update_max_value(&total->fsync_syscall_time_max, newdata->fsync_syscall_time_max);
// fsync_syscall_count
update_count_value(&total->fsync_syscall_count, newdata->fsync_syscall_count);
// write_shmem_size_avg
update_avg_value(total->write_shmem_count, &total->write_shmem_size_avg,
newdata->write_shmem_count, newdata->write_shmem_size_avg);
// write_shmem_size_max
update_max_value(&total->write_shmem_size_max, newdata->write_shmem_size_max);
// write_shmem_time_avg
update_avg_value(total->write_shmem_count, &total->write_shmem_time_avg,
newdata->write_shmem_count, newdata->write_shmem_time_avg);
// write_shmem_time_max
update_max_value(&total->write_shmem_time_max, newdata->write_shmem_time_max);
// write_shmem_count
update_count_value(&total->write_shmem_count, newdata->write_shmem_count);
// fsync_shmem_time_avg
update_avg_value(total->fsync_shmem_count, &total->fsync_shmem_time_avg,
newdata->fsync_shmem_count, newdata->fsync_shmem_time_avg);
// fsync_shmem_time_max
update_max_value(&total->fsync_shmem_time_max, newdata->fsync_shmem_time_max);
// fsync_shmem_count
update_count_value(&total->fsync_shmem_count, newdata->fsync_shmem_count);
// roundtrip_fsync_msg_time_avg
update_avg_value(total->roundtrip_fsync_msg_count, &total->roundtrip_fsync_msg_time_avg,
newdata->roundtrip_fsync_msg_count, newdata->roundtrip_fsync_msg_time_avg);
// roundtrip_fsync_msg_time_max
update_max_value(&total->roundtrip_fsync_msg_time_max, newdata->roundtrip_fsync_msg_time_max);
// roundtrip_fsync_msg_count
update_count_value(&total->roundtrip_fsync_msg_count, newdata->roundtrip_fsync_msg_count);
// roundtrip_test_msg_time_avg
update_avg_value(total->roundtrip_test_msg_count, &total->roundtrip_test_msg_time_avg,
newdata->roundtrip_test_msg_count, newdata->roundtrip_test_msg_time_avg);
// roundtrip_test_msg_time_max
update_max_value(&total->roundtrip_test_msg_time_max, newdata->roundtrip_test_msg_time_max);
// roundtrip_test_msg_count
update_count_value(&total->roundtrip_test_msg_count, newdata->roundtrip_test_msg_count);
}
static void accumulate_filerep_mirror_data(gpmon_filerep_mirrorstats_s* total, gpmon_filerep_mirrorstats_s* newdata)
{
// ALWAYS UPDATE AVERAGES BEFORE COUNTS ... AVERAGE UPDATE USES COUNT
// write_syscall_size_avg
update_avg_value(total->write_syscall_count, &total->write_syscall_size_avg,
newdata->write_syscall_count, newdata->write_syscall_size_avg);
// write_syscall_size_max
update_max_value(&total->write_syscall_size_max, newdata->write_syscall_size_max);
// write_syscall_time_avg;
update_avg_value(total->write_syscall_count, &total->write_syscall_time_avg,
newdata->write_syscall_count, newdata->write_syscall_time_avg);
// write_syscall_time_max
update_max_value(&total->write_syscall_time_max, newdata->write_syscall_time_max);
// write_syscall_count
update_count_value(&total->write_syscall_count, newdata->write_syscall_count);
// fsync_syscall_time_avg;
update_avg_value(total->fsync_syscall_count, &total->fsync_syscall_time_avg,
newdata->fsync_syscall_count, newdata->fsync_syscall_time_avg);
// fsync_syscall_time_max
update_max_value(&total->fsync_syscall_time_max, newdata->fsync_syscall_time_max);
// fsync_syscall_count
update_count_value(&total->fsync_syscall_count, newdata->fsync_syscall_count);
}
static void accumulate_filerep_data_in_packet(gpmon_filerepinfo_t* total, gpmon_filerepinfo_t* newdata)
{
if (total->key.isPrimary != newdata->key.isPrimary)
{
gpmon_warning(FLINE, "filerep unexpected key mismatch");
return;
}
total->elapsedTime_secs += newdata->elapsedTime_secs;
if (total->key.isPrimary)
{
accumulate_filerep_primary_data(&total->stats.primary, &newdata->stats.primary);
}
else
{
accumulate_filerep_mirror_data(&total->stats.mirror, &newdata->stats.mirror);
}
}
static void gx_recvfilerep(gpmon_packet_t* pkt)
{
gpmon_filerepinfo_t* p;
gp_smon_to_mmon_packet_t* rec = NULL;
if (pkt->pkttype != GPMON_PKTTYPE_FILEREP)
gpsmon_fatal(FLINE, "assert failed; expected pkttype filerep");
p = &pkt->u.filerepinfo;
TR2(("Received filerep packet primary %s:%d mirror %s:%d isPrimary(%d)\n",
p->key.dkey.primary_hostname, p->key.dkey.primary_port, p->key.dkey.mirror_hostname, p->key.dkey.mirror_port));
rec = apr_hash_get(gx.filereptab, &p->key, sizeof(p->key));
if (rec)
{
accumulate_filerep_data_in_packet(&rec->u.filerepinfo, &pkt->u.filerepinfo);
}
else
{
rec = gx_pkt_to_smon_to_mmon(apr_hash_pool_get(gx.filereptab), pkt);
apr_hash_set(gx.filereptab, &rec->u.filerepinfo.key, sizeof(rec->u.filerepinfo.key), rec);
}
}
static void gx_recvsegment(gpmon_packet_t* pkt)
{
gpmon_seginfo_t* p;
......@@ -1899,9 +1707,6 @@ static void gx_recvfrom(SOCKET sock, short event, void* arg)
case GPMON_PKTTYPE_QEXEC:
gx_recvqexec(&pkt);
break;
case GPMON_PKTTYPE_FILEREP:
gx_recvfilerep(&pkt);
break;
default:
gpmon_warning(FLINE, "unexpected packet type %d", pkt.pkttype);
return;
......@@ -2182,10 +1987,6 @@ static void setup_gx(int port, apr_int64_t signature)
gx.segmenttab = apr_hash_make(subpool);
CHECKMEM(gx.segmenttab);
/* filerep hash table */
gx.filereptab = apr_hash_make(subpool);
CHECKMEM(gx.filereptab);
/* queryseg hash table */
gx.querysegtab = apr_hash_make(subpool);
CHECKMEM(gx.querysegtab);
......
此差异已折叠。
此差异已折叠。
......@@ -1238,22 +1238,6 @@ int
FileWrite(File file, char *buffer, int amount)
{
int returnCode;
FileRepGpmonRecord_s gpmonRecord;
FileRepGpmonStatType_e whichStat =0;
if (fileRepRole == FileRepPrimaryRole)
{
whichStat = FileRepGpmonStatType_PrimaryWriteSyscall;
FileRepGpmonStat_OpenRecord(whichStat, &gpmonRecord);
gpmonRecord.size = amount;
} else if (fileRepRole == FileRepMirrorRole)
{
whichStat = FileRepGpmonStatType_MirrorWriteSyscall;
FileRepGpmonStat_OpenRecord(whichStat, &gpmonRecord);
gpmonRecord.size = amount;
}
Assert(FileIsValid(file));
......@@ -1265,11 +1249,11 @@ FileWrite(File file, char *buffer, int amount)
if (returnCode < 0)
return returnCode;
#ifdef FAULT_INJECTOR
#ifdef FAULT_INJECTOR
if (! strcmp(VfdCache[file].fileName, "global/pg_control"))
{
if (FaultInjector_InjectFaultIfSet(
PgControl,
PgControl,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */) == FaultInjectorTypeDataCorruption)
......@@ -1277,11 +1261,11 @@ FileWrite(File file, char *buffer, int amount)
MemSet(buffer, 0, amount);
}
}
if (strstr(VfdCache[file].fileName, "pg_xlog/"))
{
if (FaultInjector_InjectFaultIfSet(
PgXlog,
PgXlog,
DDLNotSpecified,
"" /* databaseName */,
"" /* tableName */) == FaultInjectorTypeDataCorruption)
......@@ -1328,15 +1312,6 @@ retry:
VfdCache[file].seekPos = FileUnknownPos;
}
if (returnCode >= 0)
{
//only include stat if successful
if ((fileRepRole == FileRepPrimaryRole) ||
(fileRepRole == FileRepMirrorRole))
{
FileRepGpmonStat_CloseRecord(whichStat, &gpmonRecord);
}
}
return returnCode;
}
......@@ -1344,18 +1319,6 @@ int
FileSync(File file)
{
int returnCode;
FileRepGpmonRecord_s gpmonRecord;
FileRepGpmonStatType_e whichStat;
if (fileRepRole == FileRepPrimaryRole)
{
whichStat = FileRepGpmonStatType_PrimaryFsyncSyscall;
FileRepGpmonStat_OpenRecord(whichStat, &gpmonRecord);
} else
{
whichStat = FileRepGpmonStatType_MirrorFsyncSyscall;
FileRepGpmonStat_OpenRecord(whichStat, &gpmonRecord);
}
Assert(FileIsValid(file));
DO_DB(elog(LOG, "FileSync: %d (%s)",
......@@ -1368,16 +1331,7 @@ FileSync(File file)
SIMPLE_FAULT_INJECTOR(FileRepFlush);
returnCode = pg_fsync(VfdCache[file].fd);
if (returnCode >= 0)
{
//only include stats if successful
if ((fileRepRole == FileRepPrimaryRole) ||
(fileRepRole == FileRepMirrorRole))
{
FileRepGpmonStat_CloseRecord(whichStat, &gpmonRecord);
}
}
return returnCode;
}
......@@ -1520,10 +1474,10 @@ FileTruncate(File file, int64 offset)
* table data.
*/
returnCode = ftruncate(VfdCache[file].fd, offset);
/* Assume we don't know the file position anymore */
VfdCache[file].seekPos = FileUnknownPos;
return returnCode;
}
......@@ -2197,7 +2151,7 @@ GetTempFilePrefix(char * buf, size_t buflen, const char * fileName)
{
return needlen;
}
snprintf(buf, buflen, "%s/%s_%s",
PG_TEMP_FILES_DIR,
PG_TEMP_FILE_PREFIX,
......
......@@ -190,7 +190,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
size = add_size(size, FileRepResync_ShmemSize());
size = add_size(size, FileRepIpc_ShmemSize());
size = add_size(size, FileRepLog_ShmemSize());
size = add_size(size, FileRepStats_ShmemSize());
}
#ifdef FAULT_INJECTOR
......@@ -387,7 +386,6 @@ CreateSharedMemoryAndSemaphores(bool makePrivate, int port)
FileRepResync_ShmemInit();
FileRepIpc_ShmemInit();
FileRepLog_ShmemInit();
FileRepStats_ShmemInit();
}
#ifdef FAULT_INJECTOR
......
......@@ -1293,64 +1293,6 @@ extern bool FileRepIsBackendSubProcess(FileRepProcessType_e processType);
extern void FileRep_SetFileRepRetry(void);
/************* FILEREPGPMON ********************/
#define FILEREP_GPMON_USE_TIMERS 0
typedef struct FileRepGpmonInfo_s {
gpmon_packet_t gpmonPacket;
int gpsock;
struct sockaddr_storage gpaddr; // Allow for either IPv4 or IPv6
int gpaddr_len; // And remember the length
TimestampTz lastSend;
} FileRepGpmonInfo_s;
typedef enum FileRepGpmonStatType_e {
FileRepGpmonStatType_PrimaryRoundtripTestMsg =0, //COUNT, TIME
FileRepGpmonStatType_PrimaryFsyncShmem, //COUNT, TIME
FileRepGpmonStatType_PrimaryWriteShmem, //COUNT, TIME, SIZE
FileRepGpmonStatType_PrimaryRoundtripFsyncMsg, //COUNT, TIME
FileRepGpmonStatType_PrimaryWriteSyscall, //COUNT, TIME, SIZE
FileRepGpmonStatType_PrimaryFsyncSyscall, //COUNT, TIME
FileRepGpmonStatType_MirrorWriteSyscall, //COUNT, TIME, SIZE
FileRepGpmonStatType_MirrorFsyncSyscall, //COUNT, TIME
FileRepGpmonStatType__EnumerationCount
} FileRepGpmonStatType_e;
typedef struct FileRepGpmonRecord_s {
FileRepGpmonStatType_e whichStat;
/*
could have a union here of different records
for each stat type if we wanted
*/
//should we use gettimeofday and struct timeval instead?
TimestampTz startTime;
TimestampTz endTime;
int32 size;
} FileRepGpmonRecord_s;
void FileRepStats_GpmonInit(void);
void FileRepGpmonStat_OpenRecord(FileRepGpmonStatType_e whichStat,
FileRepGpmonRecord_s *record);
void FileRepGpmonStat_CloseRecord(FileRepGpmonStatType_e whichStat,
FileRepGpmonRecord_s *record);
void
FileRepStats_ShmemInit(void);
Size
FileRepStats_ShmemSize(void);
extern void FileRep_resetSpinLocks(void);
#endif /* CDBFILEREP_H */
......
......@@ -11,7 +11,6 @@ typedef struct gpmon_qexec_t gpmon_qexec_t;
typedef struct gpmon_hello_t gpmon_hello_t;
typedef struct gpmon_metrics_t gpmon_metrics_t;
typedef struct gpmon_seginfo_t gpmon_seginfo_t;
typedef struct gpmon_filerepinfo_t gpmon_filerepinfo_t;
typedef struct gpmon_fsinfokey_t gpmon_fsinfokey_t;
typedef struct gpmon_fsinfo_t gpmon_fsinfo_t;
typedef struct gpmon_query_seginfo_key_t gpmon_query_seginfo_key_t;
......@@ -245,106 +244,6 @@ struct gpmon_seginfo_t {
uint64 dynamic_memory_available; // available memory in bytes,
};
/*
//we could clean up the primary and mirror stats using this basicStat struct
typedef struct gpmon_filerep_basicStat_s
{
uint32 count;
uint32 time_avg;
uint32 time_max;
uint32 size_avg;
uint32 size_max;
} gpmon_filerep_basicStat_s;
*/
/*
* Filerep related statistics
*/
typedef struct gpmon_filerep_primarystats_s
{
//NOTE: 32 bits can store over an hour of microseconds - we will not worry about this
// EVENT: write systemcall on primary
uint32 write_syscall_size_avg;
uint32 write_syscall_size_max;
uint32 write_syscall_time_avg; // microseconds;
uint32 write_syscall_time_max; // microseconds;
uint32 write_syscall_count;
// EVENT: fsync systemcall on primary
uint32 fsync_syscall_time_avg; // microseconds;
uint32 fsync_syscall_time_max; // microseconds;
uint32 fsync_syscall_count;
// EVENT: putting write message into shared memory
uint32 write_shmem_size_avg;
uint32 write_shmem_size_max;
uint32 write_shmem_time_avg; // microseconds;
uint32 write_shmem_time_max; // microseconds;
uint32 write_shmem_count;
// EVENT: putting fsync message into shared memory
uint32 fsync_shmem_time_avg; // microseconds;
uint32 fsync_shmem_time_max; // microseconds;
uint32 fsync_shmem_count;
// EVENT: Roundtrip from sending fsync message to mirror to get ack back on primary
uint32 roundtrip_fsync_msg_time_avg; // microseconds;
uint32 roundtrip_fsync_msg_time_max; // microseconds;
uint32 roundtrip_fsync_msg_count;
// EVENT: Roundtrip from sending test message to mirror to getting back on primary
uint32 roundtrip_test_msg_time_avg; // microseconds;
uint32 roundtrip_test_msg_time_max; // microseconds;
uint32 roundtrip_test_msg_count;
} gpmon_filerep_primarystats_s;
typedef struct gpmon_filerep_mirrorstats_s
{
// EVENT: write systemcall on mirror
uint32 write_syscall_size_avg;
uint32 write_syscall_size_max;
uint32 write_syscall_time_avg; // microseconds;
uint32 write_syscall_time_max; // microseconds;
uint32 write_syscall_count;
// EVENT: fsync systemcall on mirror
uint32 fsync_syscall_time_avg; // microseconds;
uint32 fsync_syscall_time_max; // microseconds;
uint32 fsync_syscall_count;
} gpmon_filerep_mirrorstats_s;
typedef union gpmon_filerep_stats_u {
gpmon_filerep_primarystats_s primary;
gpmon_filerep_mirrorstats_s mirror;
} gpmon_filerep_stats_u;
typedef struct gpmmon_filerep_key_t
{
char primary_hostname[NAMEDATALEN];
uint16 primary_port;
char mirror_hostname[NAMEDATALEN];
uint16 mirror_port;
} gpmmon_filerep_key_t;
typedef struct gpsmon_filerep_key_t
{
gpmmon_filerep_key_t dkey;
bool isPrimary;
} gpsmon_filerep_key_t;
struct gpmon_filerepinfo_t
{
gpsmon_filerep_key_t key;
float elapsedTime_secs;
gpmon_filerep_stats_u stats;
};
/* ------------------------------------------------------------------
HELLO
------------------------------------------------------------------ */
......@@ -373,7 +272,6 @@ enum gpmon_pkttype_t {
GPMON_PKTTYPE_QLOG = 3,
GPMON_PKTTYPE_QEXEC = 4,
GPMON_PKTTYPE_SEGINFO = 5,
GPMON_PKTTYPE_FILEREP = 6,
GPMON_PKTTYPE_QUERY_HOST_METRICS = 7, // query metrics update from a segment such as CPU per query
GPMON_PKTTYPE_FSINFO = 8,
GPMON_PKTTYPE_QUERYSEG = 9,
......@@ -394,7 +292,6 @@ struct gpmon_packet_t {
gpmon_qlog_t qlog;
gpmon_qexec_t qexec;
gpmon_seginfo_t seginfo;
gpmon_filerepinfo_t filerepinfo;
gpmon_fsinfo_t fsinfo;
} u;
};
......
......@@ -1008,12 +1008,6 @@ GpErrorDataFixFields
GpExecIdentity
GpGlobalSequence
GpId
gpmmon_filerep_key_t
gpmon_filerepinfo_t
gpmon_filerep_mirrorstats_s
gpmon_filerep_primarystats_s
gpmon_filerep_stats_u
gpmon_filrep_holder_t
gpmon_hello_t
gpmon_metrics_t
gpmon_packet_t
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册