LOG_ERROR("cdc_start_tstamp_ns is invalid",KR(ret),K(cdc_start_tstamp_ns));
}
// Get the DDL processing progress first, because the DDL is the producer of the data partition, and getting it first will ensure that the overall progress is not reverted
// Note: the progress value should not be invalid
...
...
@@ -794,8 +802,15 @@ int ObLogFetcher::next_heartbeat_timestamp_(int64_t &heartbeat_tstamp, const int
(desc,results)=query_cur.exec_query("""select count(1) as unsync_cnt from __all_virtual_clog_stat where is_in_sync = 0 and is_offline = 0 and replica_type != 16""")
(desc,results)=query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""")
(desc,results)=query_cur.exec_query("""select 1 from v$sysstat where name = 'is mini mode' and value = '1' and con_id = 1 limit 1""")
iflen(results)>0:
# mini部署的集群,租户规格可以很小,这里跳过检查
pass
else:
(desc,results)=query_cur.exec_query("""select count(*) from oceanbase.__all_resource_pool a, oceanbase.__all_unit_config b where a.unit_config_id = b.unit_config_id and b.unit_config_id != 100 and a.replica_type=0 and b.min_memory < 5368709120""")
ifresults[0][0]>0:
raiseMyError('{0} tenant resource pool unit config is less than 5G, please check'.format(results[0][0]))
(desc,results)=query_cur.exec_query("""select count(*) from oceanbase.__all_resource_pool a, oceanbase.__all_unit_config b where a.unit_config_id = b.unit_config_id and b.unit_config_id != 100 and a.replica_type=5 and b.min_memory < 2147483648""")
ifresults[0][0]>0:
raiseMyError('{0} tenant logonly resource pool unit config is less than 2G, please check'.format(results[0][0]))
# 9. 检查是否有日志型副本分布在Full类型unit中
# 2020-12-31 根据外部使用L副本且L型unit功能不成熟的需求,将这个检查去掉.
# 10. 检查租户分区数是否超出内存限制
defcheck_tenant_part_num(query_cur):
# 统计每个租户在各个server上的分区数量
(desc,res_part_num)=query_cur.exec_query("""select svr_ip, svr_port, table_id >> 40 as tenant_id, count(*) as part_num from __all_virtual_clog_stat group by 1,2,3 order by 1,2,3""")
# 计算每个租户在每个server上的max_memory
(desc,res_unit_memory)=query_cur.exec_query("""select u.svr_ip, u.svr_port, t.tenant_id, uc.max_memory, p.replica_type from __all_unit u, __All_resource_pool p, __all_tenant t, __all_unit_config uc where p.resource_pool_id = u.resource_pool_id and t.tenant_id = p.tenant_id and p.unit_config_id = uc.unit_config_id""")
# 查询每个server的memstore_limit_percentage
(desc,res_svr_memstore_percent)=query_cur.exec_query("""select svr_ip, svr_port, name, value from __all_virtual_sys_parameter_stat where name = 'memstore_limit_percentage'""")
raiseMyError('{0} {1} {2} exceed tenant partition num limit, please check'.format(line,uline,mpline))
break
logging.info('check tenant partition num success')
# 11. 检查存在租户partition,但是不存在unit的observer
defcheck_tenant_resource(query_cur):
(desc,res_unit)=query_cur.exec_query("""select tenant_id, svr_ip, svr_port from __all_virtual_partition_info where (tenant_id, svr_ip, svr_port) not in (select tenant_id, svr_ip, svr_port from __all_unit, __all_resource_pool where __all_unit.resource_pool_id = __all_resource_pool.resource_pool_id group by tenant_id, svr_ip, svr_port) group by tenant_id, svr_ip, svr_port""")
forlineinres_unit:
raiseMyError('{0} tenant unit not exist but partition exist'.format(line))
logging.info("check tenant resource success")
# 12. 检查系统表(__all_table_history)索引生效情况
defcheck_sys_index_status(query_cur):
(desc,results)=query_cur.exec_query("""select count(*) as cnt from __all_table where data_table_id = 1099511627890 and table_id = 1099511637775 and index_type = 1 and index_status = 2""")
iflen(results)!=1orresults[0][0]!=1:
raiseMyError("""__all_table_history's index status not valid""")
logging.info("""check __all_table_history's index status success""")
# 14. 检查升级前是否有只读zone
defcheck_readonly_zone(query_cur):
(desc,results)=query_cur.exec_query("""select count(*) from __all_zone where name='zone_type' and info='ReadOnly'""")
ifresults[0][0]!=0:
raiseMyError("""check_readonly_zone failed, ob2.2 not support readonly_zone""")
logging.info("""check_readonly_zone success""")
raiseMyError('{0} tenant is merging, please check'.format(results[0][0]))
(desc,results)=query_cur.exec_query("""select distinct value from __all_virtual_sys_parameter_stat where name='min_observer_version'""")
iflen(results)!=1:
raiseMyError('distinct observer version exist')
elifcmp(results[0][0],"2.2.40")>=0:
# 最小版本大于等于2.2.40,忽略检查
logging.info('cluster version ({0}) is greate than or equal to 2.2.40, need not check high_priority_net_thread_count'.format(results[0][0]))
else:
# 低于224版本的需要确认配置项值为0
logging.info('cluster version is ({0}), need check high_priority_net_thread_count'.format(results[0][0]))
(desc,results)=query_cur.exec_query("""select count(*) from __all_sys_parameter where name like 'high_priority_net_thread_count' and value not like '0'""")
ifresults[0][0]>0:
raiseMyError('high_priority_net_thread_count is greater than 0, unexpected')
(desc,results)=query_cur.exec_query("""select count(1) from __all_virtual_sys_parameter_stat where name='micro_block_merge_verify_level' and value < 2""")
ifresults[0][0]!=0:
raiseMyError("""unexpected micro_block_merge_verify_level detected, upgrade is not allowed temporarily""")
(desc,results)=query_cur.exec_query("""select count(*) from __all_core_table where table_name = '__all_cluster' and column_name = 'protection_mode'""");
iflen(results)!=1:
raiseMyError('failed to get protection mode')
elifresults[0][0]==0:
logging.info('no need to check protection mode')
else:
(desc,results)=query_cur.exec_query("""select column_value from __all_core_table where table_name = '__all_cluster' and column_name = 'protection_mode'""");
iflen(results)!=1:
raiseMyError('failed to get protection mode')
elifcmp(results[0][0],'0')!=0:
raiseMyError('cluster not maximum performance protection mode before update not allowed, protecion_mode={0}'.format(results[0][0]))
else:
logging.info('cluster protection mode legal before update!')
# 27. 检查无恢复任务
defcheck_restore_job_exist(query_cur):
(desc,results)=query_cur.exec_query("""select count(1) from __all_restore_job""")
(desc,results)=query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""")
iflen(results)!=1orlen(results[0])!=1:
raiseMyError('failed to restore job cnt')
elifresults[0][0]!=0:
raiseMyError("""still has restore job, upgrade is not allowed temporarily""")
logging.info('check restore job success')
# 28. 检查系统租户系统表leader是否打散
defcheck_sys_table_leader(query_cur):
(desc,results)=query_cur.exec_query("""select svr_ip, svr_port from oceanbase.__all_virtual_core_meta_table where role = 1""")
iflen(results)!=1orlen(results[0])!=2:
raiseMyError('failed to rs leader')
else:
svr_ip=results[0][0]
svr_port=results[0][1]
# check __all_root_table's leader
(desc,results)=query_cur.exec_query("""select count(1) from oceanbase.__all_virtual_core_root_table
where role = 1 and svr_ip = '{0}' and svr_port = '{1}'""".format(svr_ip,svr_port))
iflen(results)!=1orlen(results[0])!=1:
raiseMyError('failed to __all_root_table leader')
elifresults[0][0]!=1:
raiseMyError("""__all_root_table should be {0}:{1}""".format(svr_ip,svr_port))
# check sys tables' leader
(desc,results)=query_cur.exec_query("""select count(1) from oceanbase.__all_virtual_core_root_table
where tenant_id = 1 and role = 1 and (svr_ip != '{0}' or svr_port != '{1}')""".format(svr_ip,svr_port))
iflen(results)!=1orlen(results[0])!=1:
raiseMyError('failed to __all_root_table leader')
elifresults[0][0]!=0:
raiseMyError("""sys tables'leader should be {0}:{1}""".format(svr_ip,svr_port))
sql="select distinct value = '{0}' from oceanbase.__all_virtual_sys_parameter_stat where name='min_observer_version'".format(upgrade_params.new_version)
sql="select distinct value = '{0}' from oceanbase.GV$OB_PARAMETERS where name='min_observer_version'".format(upgrade_params.new_version)