#-*- coding:utf-8 -*- #!/usr/bin/env python3 import json import sys import signal import readline import os c_title = '\033[1;4;31;42m' # title color c_br = '\033[1;31m' # bold red c_bg = '\033[1;32m' # bold green c_by = '\033[1;33m' # bold yellow c_bb = '\033[1;34m' # bold blue c_bp = '\033[1;35m' # bold purple c_bc = '\033[1;36m' # bold cyan c_bir= '\033[1;3;31m' # * bold italic red c_bib = '\033[1;3;34m' # * bold italic cyan c_bic = '\033[1;3;36m' # bold italic cyan c_e = '\033[0m' # reset def get_parent(parent_log, inp_parent_id): parent_all = {"dyn_first_parent": "动态一层父", "dyn_first_parent_all": "动态一层父所有", "dyn_first_parent_backups": "动态一层备父", "dyn_second_parent": "动态二层父", "first_parent": "一层父", "first_parent_backups": "一层备父", "pre_first_parent": "预部署一层父", "pre_first_parent_backups": "预部署一层备父", "pre_second_parent": "预部署二层父", "pre_second_parent_backups": "预部署二层备父", "second_parent": "二层父", "second_parent_backups": "二层备父"} parent_related = {} with open(parent_log) as obj_parent: parents=json.loads(obj_parent.read()) for parent in parents['result']: if parent['parent_id'] == inp_parent_id: parent_name = parent['parent_name'] print(f"父方案: {parent_name}") for parent_en, parent_cn in parent_all.items(): if parent[parent_en] != '': parent_related[parent[parent_en]] = parent_cn for parent_en, parent_cn in parent_related.items(): print(f"{parent_cn}: {parent_en}") break def get_respool(respool_log, inp_template_id, pool_type): with open(respool_log) as obj_respool: respools=json.loads(obj_respool.read()) for respool in respools['result']: if int(respool['template_id']) == int(inp_template_id): # print(f"{pool_type}: {respool['template_name']}") return (f"{pool_type}: {respool['template_name']}") def domain_info_1(domain_info_log, inp_domain): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 判断是否是重叠域名 multi = len(domain_infos['data']) if multi == 0: print(f"{c_br}未找到该域名相关信息,可以登录网页系统查看是否有配置解析组,退出...{c_e}") sys.exit(205) overlap = "是" if multi > 1 else "否" inp_index = 1 if multi > 1: print(f"{c_bp}该域名是重叠域名,请确认要查询域名的归属账号: {c_e}") index = 1 flag = 0 # 遍历重叠域名的账号邮箱,需要输入确定的序号 for domain_info in domain_infos['data']: print(f"账号{index} - ", end="") for find_it in domain_info['domains']: if find_it['domain'] == inp_domain: pretty_print3(f"账户: {find_it['account_name']}", f"邮箱: {find_it['email']}", f"accid: {find_it['account_id']}") flag = 0 break flag = 1 if flag == 1: print() flag = 0 index += 1 print(f"{c_by}请输入要查询域名归属账号的序号(e.g. 1, 2, 3...): {c_e}") # 验证index是合法输入的逻辑 inp_index = input() if inp_index.isdigit() and 1 <= int(inp_index) and int(inp_index) < index: inp_index = int(inp_index) else: print(f"{c_br}请输入正确的序号,{c_e}", end="") sys.exit(200) inp_index -= 1 inp_index = inp_index if inp_index != 0 else 0 common_cname = len(domain_infos['data'][inp_index]['domains']) for find_it in range(common_cname): if domain_infos['data'][inp_index]['domains'][find_it]['domain'] == inp_domain: break common_cname = '是' if common_cname > 1 else '否' common_cnames = [] for domain in domain_infos['data'][inp_index]['domains']: common_cnames.append(domain['domain']) account = domain_infos['data'][inp_index]['domains'][find_it]['account_name'] account_id = domain_infos['data'][inp_index]['domains'][find_it]['account_id'] access_id = domain_infos['data'][inp_index]['domains'][find_it]['access_id'] email = domain_infos['data'][inp_index]['domains'][find_it]['email'] cname = domain_infos['data'][inp_index]['cname'] cname_vendor = domain_infos['data'][inp_index]['access_vendor_cname'] parse_group = domain_infos['data'][inp_index]['parse_group_name'] with open("info.log", 'w', encoding='utf-8') as obj_info: obj_info.write(f"1:{account}\n") obj_info.write(f"2:{email}\n") obj_info.write(f"3:{account_id}\n") obj_info.write(f"4:{access_id}\n") pretty_print3(f"账户: {account}", f"邮箱: {email}", f"accId: {account_id}") pretty_print3(f"Map: {parse_group}", f"accessId: {access_id}", f"重叠域名: {overlap}") pretty_print3(f"合作方: {cname_vendor}", f"CNAME: {cname}", f"是否共享CNAME缓存: {common_cname}") if common_cname == '是': print(f"共享CNAME缓存域名列表: {common_cnames}") if parse_group == '': sys.exit(201) def domain_info_2(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 遍历账号名称相同的客户,使用request id确定唯一的信息 for acc_info in domain_infos['results']['items']: if acc_info['accountId'] == inp_accid: managerArea = acc_info['managerArea'] platformVipLevel = acc_info['platformVipLevel'] businessLevel = acc_info['businessLevel'] ctYunVipLevel = acc_info['ctYunVipLevel'] clientId = acc_info['clientId'] accountType = acc_info['accountType'] clientInsideName = acc_info['clientInsideName'] maintainAfterName = acc_info['maintainAfterName'] maintainAfterPhone = acc_info['maintainAfterPhone'] maintainAfterEmail = acc_info['maintainAfterEmail'] managerVendor = acc_info['managerVendor'] pretty_print3(f"售后姓名: {maintainAfterName}", f"售后电话: {maintainAfterPhone}", f"售后邮箱: {maintainAfterEmail}") pretty_print3(f"天翼云VIP等级: {ctYunVipLevel}", f"平台VIP等级: {platformVipLevel}", f"客户VIP等级: {businessLevel}") pretty_print3(f"clientId: {clientId}", f"客户内部名称: {clientInsideName}", f"商务渠道: {managerArea}") pretty_print2(f"承载平台: {managerVendor}", f"客户类型: {accountType}") break def domain_info_3(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 遍历重叠域名,使用request id确定唯一的信息 for domain_info in domain_infos['data']['results']: if domain_info['accountId'] == inp_accid: statusName = domain_info['statusName'] ipv6Switch = domain_info['ipv6Switch'] productName = domain_info['productName'] innerTestDomain = domain_info['innerTestDomain'] ipv6Switch = '是' if ipv6Switch == 1 else '否' innerTestDomain = '是' if innerTestDomain == 1 else '否' pretty_print2(f"域名状态: {statusName}", f"是否开启IPv6: {ipv6Switch}") pretty_print2(f"是否内部测试域名: {innerTestDomain}", f"产品类型: {productName}") break def domain_info_4(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 遍历账号名称相同的客户,使用request id确定唯一的信息 if len(domain_infos['result']) == 0: sys.exit(204) else: for domain_info in domain_infos['result']: if domain_info['account_id'] == inp_accid: # 1. 回源地址 origin = [] for ori in domain_info['origin']: origin.append(ori['role'] + ':' + ori['origin']) # 2. 访问协议 + 端口 http_visit = domain_info['basic_conf']['http_server_port'] if domain_info['http_status'] == 'on' else 'X' https_visit = domain_info['basic_conf']['https_server_port'] if domain_info['https_status'] == 'on' else 'X' url_visit = str(http_visit) + '/' + str(https_visit) # 3. 回源协议 + 端口 https_origin = str(domain_info['basic_conf']['https_origin_port']) http_origin = str(domain_info['basic_conf']['http_origin_port']) if domain_info['backorigin_protocol'] == 'follow_request': url_origin = http_origin + '/' + https_origin elif domain_info['backorigin_protocol'] == 'http': url_origin = http_origin + '/X' elif domain_info['backorigin_protocol'] == 'https': url_origin = 'X/' + https_origin else: print("回源协议除了http/https/follow_request之外,还有第四种方式,请补充...") sys.exit(201) # 4. 证书备注名 cert_name = domain_info['cert_name'] # 6. 预部署资源池 pre_node_list = domain_info['pre_node_list'] off_pool = get_respool("respool.log", pre_node_list, '预部署资源池') # 7. 全局资源池 node_list = domain_info['node_list'] on_pool = get_respool("respool.log", node_list, '全局资源池') # 8. 是否热加载 conf_order_id = domain_info['conf_order_id'] conf_order_id = '否' if conf_order_id == -1 else '是' pretty_print2(f"证书备注名: {cert_name}", f"热加载: {conf_order_id}") pretty_print2(off_pool, on_pool) print(f"回源地址: {origin}") print(f"http/https访问: {url_visit}") print(f"http/https回源: {url_origin}") # 5. 父方案 parent_id parent_id = domain_info['parent_id'] get_parent("parent.log", parent_id) break def domain_info_5(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 遍历账号名称相同的客户,使用request id确定唯一的信息 for domain_info in domain_infos['result']: if domain_info['account_id'] == inp_accid: with open("info.log", 'w', encoding='utf-8') as obj_info: obj_info.write(f"4:{domain_info['domain_id']}\n") break # 如下accid没用到 def domain_info_6(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_info=json.loads(obj_domain_info.read())['result'] # 推拉流模式 push_stream_domain = '' pull_stream_mode = domain_info['base_conf']['pull_stream_mode'] if pull_stream_mode == 0: pull_stream_mode = "直播拉流(推拉流)" push_stream_domain = domain_info['base_conf']['push_stream_domain'] elif pull_stream_mode == 1: pull_stream_mode = "直播拉流(回源拉流)" else: pull_stream_mode = "直播推流" # 证书备注名 if domain_info['protocol_control']['https_switch'] == 1: cert_name = domain_info['protocol_control']['cert_name'] else: cert_name = '无绑定证书' pretty_print3(f"推拉流模式: {pull_stream_mode}", f"推流域名: {push_stream_domain}", f"证书备注名: {cert_name}") # 预部署资源池 pre_node_list = domain_info['pre_resouce_id'] off_pool = get_respool("respool.log", pre_node_list, '预部署资源池') # 全局资源池 node_list = domain_info['resouce_id'] on_pool = get_respool("respool.log", node_list, '全局资源池') pretty_print2(off_pool, on_pool) # 回源模式 origin_mode = domain_info['base_conf']['origin_mode'] for mode in origin_mode: print(f"回源模式: {mode}") mode_desc = domain_info['base_conf'][f'{mode}_origin'] for ori in mode_desc: for k, v in ori.items(): if v != '': print(f"{k}: {v}") # 父方案 parent_id parent_id = domain_info['parent_id'] get_parent("parent.log", parent_id) def domain_map_info(domain_map_log, flg): with open(domain_map_log) as obj_domain_map_log: map_info=json.loads(obj_domain_map_log.read()) # 判断是否是重叠域名 parse_detail=map_info['parse_detail'] if int(flg) == 0: print('------------------------------分区域解析------------------------------') for item in parse_detail: pretty_print3(item['area_cnname'], item['type'], item['value'], 1) # write to file here print('----------------------------------------------------------------------') else: with open('map.log', 'w') as obj_map_log: for item in parse_detail: obj_map_log.write(f"{item['value']}\n") def map_info(map_info_log, inp_domain): with open(map_info_log) as obj_map_info: map_infos=json.loads(obj_map_info.read()) # 判断是否是重叠域名 multi = len(map_infos['data']) if multi == 0: print(f"{c_br}未找到该域名相关信息,可以登录网页系统查看是否有配置解析组,退出...{c_e}") sys.exit(205) inp_index = 1 if multi > 1: print(f"{c_bp}该域名是重叠域名,请确认要查询域名的归属账号: {c_e}") index = 1 flag = 0 # 遍历重叠域名的账号邮箱,需要输入确定的序号 for map_info in map_infos['data']: print(f"账号{index} - ", end="") for find_it in map_info['domains']: if find_it['domain'] == inp_domain: pretty_print3(f"账户: {find_it['account_name']}", f"邮箱: {find_it['email']}", f"accid: {find_it['account_id']}") flag = 0 break flag = 1 if flag == 1: print() flag = 0 index += 1 print(f"{c_by}请输入要查询域名归属账号的序号(e.g. 1, 2, 3...): {c_e}") # 验证index是合法输入的逻辑 inp_index = input() if inp_index.isdigit() and 1 <= int(inp_index) and int(inp_index) < index: inp_index = int(inp_index) else: print(f"{c_br}请输入正确的序号,{c_e}", end="") sys.exit(200) inp_index -= 1 inp_index = inp_index if inp_index != 0 else 0 parse_group = map_infos['data'][inp_index]['parse_group_name'] common_cname = len(map_infos['data'][inp_index]['domains']) for find_it in range(common_cname): if map_infos['data'][inp_index]['domains'][find_it]['domain'] == inp_domain: break account_id = map_infos['data'][inp_index]['domains'][find_it]['account_id'] access_id = map_infos['data'][inp_index]['domains'][find_it]['access_id'] with open("info.log", 'w', encoding='utf-8') as obj_info: obj_info.write(f"3:{account_id}\n") obj_info.write(f"4:{access_id}\n") if parse_group != '': with open("map.log", 'w', encoding='utf-8') as obj_map: obj_map.write(f"{parse_group}\n") else: sys.exit(201) def domain_config_cdn(domain_info_log, inp_accid, domain): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 遍历账号名称相同的客户,使用request id确定唯一的信息 if len(domain_infos['result']) == 0: sys.exit(204) else: for domain_info in domain_infos['result']: config_json = json.dumps(domain_info) os.environ['config_json'] = config_json os.environ['domain_json'] = domain if domain_info['account_id'] == inp_accid: os.system("echo $config_json > $domain_json") break def domain_config_live(domain_info_log, domain): with open(domain_info_log) as obj_domain_info: domain_info=json.loads(obj_domain_info.read())['result'] config_json = json.dumps(domain_info) os.environ['config_json'] = config_json os.environ['domain_json'] = domain os.system("echo $config_json > $domain_json") def parent_info_4(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 遍历账号名称相同的客户,使用request id确定唯一的信息 if len(domain_infos['result']) == 0: sys.exit(204) else: for domain_info in domain_infos['result']: if domain_info['account_id'] == inp_accid: # 5. 父方案 parent_id parent_id = domain_info['parent_id'] get_parent_info("parent.log", parent_id) break def parent_info_5(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_infos=json.loads(obj_domain_info.read()) # 遍历账号名称相同的客户,使用request id确定唯一的信息 for domain_info in domain_infos['result']: if domain_info['account_id'] == inp_accid: with open("info.log", 'w', encoding='utf-8') as obj_info: obj_info.write(f"2:{domain_info['domain_id']}\n") break # 如下accid没用到 def parent_info_6(domain_info_log, inp_accid): with open(domain_info_log) as obj_domain_info: domain_info=json.loads(obj_domain_info.read())['result'] # 父方案 parent_id parent_id = domain_info['parent_id'] get_parent_info("parent.log", parent_id) def get_parent_info(parent_log, inp_parent_id): parent_all = ["dyn_first_parent", "dyn_first_parent_all", "dyn_first_parent_backups", "dyn_second_parent", "first_parent", "first_parent_backups", "pre_first_parent", "pre_first_parent_backups", "pre_second_parent", "pre_second_parent_backups", "second_parent", "second_parent_backups"] parent_related = {} with open(parent_log) as obj_parent: parents=json.loads(obj_parent.read()) for parent in parents['result']: if parent['parent_id'] == inp_parent_id: parent_name = parent['parent_name'] index = 1 for parent_en in parent_all: if parent[parent_en] != '': with open("cmap", 'a', encoding='utf-8') as obj_cmap: obj_cmap.write(f"{index}. {parent[parent_en]}\n") index += 1 break def quit(signum, frame): print("Bye!") sys.exit(205) def pretty_print2(col_1, col_2): len_1 = len(col_1) len_2 = len(col_2) len_1_utf8 = len(col_1.encode('utf-8')) len_2_utf8 = len(col_2.encode('utf-8')) size_1 = 48 - int((len_1_utf8 - len_1) / 2) size_2 = 40 - int((len_2_utf8 - len_2) / 2) print(f"%-{size_1}s%-{size_2}s" % (col_1, col_2)) def pretty_print3(col_1, col_2, col_3, col_4=0): len_1 = len(col_1) len_2 = len(col_2) len_3 = len(col_3) len_1_utf8 = len(col_1.encode('utf-8')) len_2_utf8 = len(col_2.encode('utf-8')) len_3_utf8 = len(col_3.encode('utf-8')) size_1 = 48- int((len_1_utf8 - len_1) / 2) size_2 = 40 - int((len_2_utf8 - len_2) / 2) size_3 = 30 - int((len_2_utf8 - len_2) / 2) if col_4 == 0: print(f"%-{size_1}s%-{size_2}s%-{size_3}s" % (col_1, col_2, col_3)) else: size_1 = 16- int((len_1_utf8 - len_1) / 2) size_2 = 10 - int((len_2_utf8 - len_2) / 2) size_3 = 60 - int((len_2_utf8 - len_2) / 2) print(f"%-{size_1}s%-{size_2}s%-{size_3}s" % (col_1, col_2, col_3)) def pretty_print_data(width: list, cols: list): for i in range(len(cols)): len_text = len(cols[i]) len_utf8 = len(cols[i].encode('utf-8')) len_size = width[i] - int((len_utf8 - len_text) / 2) if i == 8: if float(cols[i]) < 10: print(f"{c_br}%-{len_size}s{c_e}" % (cols[i]), end='') elif float(cols[i]) < 30: print(f"{c_by}%-{len_size}s{c_e}" % (cols[i]), end='') else: print(f"{c_bg}%-{len_size}s{c_e}" % (cols[i]), end='') else: print(f"%-{len_size}s" % (cols[i]), end='') print() def pretty_print_title(width: list, cols: list): for i in range(len(cols)): len_text = len(cols[i]) len_utf8 = len(cols[i].encode('utf-8')) len_size = width[i] - int((len_utf8 - len_text) / 2) print(f"{c_title}%-{len_size}s{c_e}" % (cols[i]), end='') print() def fmt_print_global(res_map): title = ["大区", "省份", "View", "组名", "VIP", "LAKE", "压测带宽", "实时带宽", "冗余带宽", "昨晚高峰", "昨中高峰"] width = [5, 18, 25, 25, 32, 15, 10, 10, 10, 10, 10] pretty_print_title(width, title) with open(res_map) as obj_res_map: lines = obj_res_map.readlines() count = 1 for line in lines: pretty_print_data(width, line.strip().split()) count += 1 if count % 25 == 0: pretty_print_title(width, title) def fmt_print_partial(res_map, view, query, domain, domain_map): if os.path.getsize(view): title = ["大区", "省份", "View", "组名", "VIP", "LAKE", "压测带宽", "实时带宽", "冗余带宽", "昨晚高峰", "昨中高峰"] width = [5, 18, 25, 25, 32, 15, 10, 10, 10, 10, 10] pretty_print_title(width, title) with open(res_map) as obj_res_map, open(view) as obj_view: views = obj_view.readlines() lines = obj_res_map.readlines() count = 1 for view_s in views: for line in lines: c_line = line.strip().split() if c_line[2] == view_s.strip(): pretty_print_data(width, c_line) count += 1 if count % 25 == 0: pretty_print_title(width, title) if count == 1: print(f"{c_br}域名{domain}的解析组{domain_map}中,不存在{query}地区的覆盖节点,请确认。{c_e}\n") sys.exit(206) else: print(f"{c_br}请按照规则,输入正确的查询条件,退出...{c_e}") sys.exit(202) def main(): option = sys.argv[1] if option == '--domain_info_1': domain_info_log = sys.argv[2] inp_domain = sys.argv[3] domain_info_1(domain_info_log, inp_domain) elif option == '--domain_info_2': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] domain_info_2(domain_info_log, inp_accid) elif option == '--domain_info_3': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] domain_info_3(domain_info_log, inp_accid) elif option == '--domain_info_4': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] domain_info_4(domain_info_log, inp_accid) elif option == '--domain_info_5': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] domain_info_5(domain_info_log, inp_accid) elif option == '--domain_info_6': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] domain_info_6(domain_info_log, inp_accid) elif option == '--domain_map_info': domain_map_log = sys.argv[2] flg = sys.argv[3] domain_map_info(domain_map_log, flg) elif option == '--map_info': map_info_log = sys.argv[2] inp_accid = sys.argv[3] map_info(map_info_log, inp_accid) elif option == '--format-global': res_map = sys.argv[2] fmt_print_global(res_map) elif option == '--format-partial': query = sys.argv[2] view = sys.argv[3] res_map = sys.argv[4] domain = sys.argv[5] domain_map = sys.argv[6] fmt_print_partial(res_map, view, query, domain, domain_map) elif option == '--domain_config_cdn': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] domain = sys.argv[4] domain_config_cdn(domain_info_log, inp_accid, domain) elif option == '--domain_config_live': domain_info_log = sys.argv[2] domain = sys.argv[3] domain_config_live(domain_info_log, domain) elif option == '--parent_info_4': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] parent_info_4(domain_info_log, inp_accid) elif option == '--parent_info_5': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] parent_info_5(domain_info_log, inp_accid) elif option == '--parent_info_6': domain_info_log = sys.argv[2] inp_accid = sys.argv[3] parent_info_6(domain_info_log, inp_accid) if __name__ == "__main__": signal.signal(signal.SIGINT, quit) main()