scripts/old/ctc/get_infos.py
2023-06-05 23:04:30 +08:00

604 lines
26 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#-*- coding:utf-8 -*-
#!/usr/bin/env python3
import json
import sys
import signal
import readline
import os
c_title = '\033[1;4;31;42m' # title color
c_br = '\033[1;31m' # bold red
c_bg = '\033[1;32m' # bold green
c_by = '\033[1;33m' # bold yellow
c_bb = '\033[1;34m' # bold blue
c_bp = '\033[1;35m' # bold purple
c_bc = '\033[1;36m' # bold cyan
c_bir= '\033[1;3;31m' # * bold italic red
c_bib = '\033[1;3;34m' # * bold italic cyan
c_bic = '\033[1;3;36m' # bold italic cyan
c_e = '\033[0m' # reset
def get_parent(parent_log, inp_parent_id):
parent_all = {"dyn_first_parent": "动态一层父", "dyn_first_parent_all": "动态一层父所有", "dyn_first_parent_backups": "动态一层备父", "dyn_second_parent": "动态二层父", "first_parent": "一层父", "first_parent_backups": "一层备父", "pre_first_parent": "预部署一层父", "pre_first_parent_backups": "预部署一层备父", "pre_second_parent": "预部署二层父", "pre_second_parent_backups": "预部署二层备父", "second_parent": "二层父", "second_parent_backups": "二层备父"}
parent_related = {}
with open(parent_log) as obj_parent:
parents=json.loads(obj_parent.read())
for parent in parents['result']:
if parent['parent_id'] == inp_parent_id:
parent_name = parent['parent_name']
print(f"父方案: {parent_name}")
for parent_en, parent_cn in parent_all.items():
if parent[parent_en] != '':
parent_related[parent[parent_en]] = parent_cn
for parent_en, parent_cn in parent_related.items():
print(f"{parent_cn}: {parent_en}")
break
def get_respool(respool_log, inp_template_id, pool_type):
with open(respool_log) as obj_respool:
respools=json.loads(obj_respool.read())
for respool in respools['result']:
if int(respool['template_id']) == int(inp_template_id):
# print(f"{pool_type}: {respool['template_name']}")
return (f"{pool_type}: {respool['template_name']}")
def domain_info_1(domain_info_log, inp_domain):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 判断是否是重叠域名
multi = len(domain_infos['data'])
if multi == 0:
print(f"{c_br}未找到该域名相关信息,可以登录网页系统查看是否有配置解析组,退出...{c_e}")
sys.exit(205)
overlap = "" if multi > 1 else ""
inp_index = 1
if multi > 1:
print(f"{c_bp}该域名是重叠域名,请确认要查询域名的归属账号: {c_e}")
index = 1
flag = 0
# 遍历重叠域名的账号邮箱,需要输入确定的序号
for domain_info in domain_infos['data']:
print(f"账号{index} - ", end="")
for find_it in domain_info['domains']:
if find_it['domain'] == inp_domain:
pretty_print3(f"账户: {find_it['account_name']}", f"邮箱: {find_it['email']}", f"accid: {find_it['account_id']}")
flag = 0
break
flag = 1
if flag == 1:
print()
flag = 0
index += 1
print(f"{c_by}请输入要查询域名归属账号的序号(e.g. 1, 2, 3...): {c_e}")
# 验证index是合法输入的逻辑
inp_index = input()
if inp_index.isdigit() and 1 <= int(inp_index) and int(inp_index) < index:
inp_index = int(inp_index)
else:
print(f"{c_br}请输入正确的序号,{c_e}", end="")
sys.exit(200)
inp_index -= 1
inp_index = inp_index if inp_index != 0 else 0
common_cname = len(domain_infos['data'][inp_index]['domains'])
for find_it in range(common_cname):
if domain_infos['data'][inp_index]['domains'][find_it]['domain'] == inp_domain:
break
common_cname = '' if common_cname > 1 else ''
common_cnames = []
for domain in domain_infos['data'][inp_index]['domains']:
common_cnames.append(domain['domain'])
account = domain_infos['data'][inp_index]['domains'][find_it]['account_name']
account_id = domain_infos['data'][inp_index]['domains'][find_it]['account_id']
access_id = domain_infos['data'][inp_index]['domains'][find_it]['access_id']
email = domain_infos['data'][inp_index]['domains'][find_it]['email']
cname = domain_infos['data'][inp_index]['cname']
cname_vendor = domain_infos['data'][inp_index]['access_vendor_cname']
parse_group = domain_infos['data'][inp_index]['parse_group_name']
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"1:{account}\n")
obj_info.write(f"2:{email}\n")
obj_info.write(f"3:{account_id}\n")
obj_info.write(f"4:{access_id}\n")
pretty_print3(f"账户: {account}", f"邮箱: {email}", f"accId: {account_id}")
pretty_print3(f"Map: {parse_group}", f"accessId: {access_id}", f"重叠域名: {overlap}")
pretty_print3(f"合作方: {cname_vendor}", f"CNAME: {cname}", f"是否共享CNAME缓存: {common_cname}")
if common_cname == '':
print(f"共享CNAME缓存域名列表: {common_cnames}")
if parse_group == '':
sys.exit(201)
def domain_info_2(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
for acc_info in domain_infos['results']['items']:
if acc_info['accountId'] == inp_accid:
managerArea = acc_info['managerArea']
platformVipLevel = acc_info['platformVipLevel']
businessLevel = acc_info['businessLevel']
ctYunVipLevel = acc_info['ctYunVipLevel']
clientId = acc_info['clientId']
accountType = acc_info['accountType']
clientInsideName = acc_info['clientInsideName']
maintainAfterName = acc_info['maintainAfterName']
maintainAfterPhone = acc_info['maintainAfterPhone']
maintainAfterEmail = acc_info['maintainAfterEmail']
managerVendor = acc_info['managerVendor']
pretty_print3(f"售后姓名: {maintainAfterName}", f"售后电话: {maintainAfterPhone}", f"售后邮箱: {maintainAfterEmail}")
pretty_print3(f"天翼云VIP等级: {ctYunVipLevel}", f"平台VIP等级: {platformVipLevel}", f"客户VIP等级: {businessLevel}")
pretty_print3(f"clientId: {clientId}", f"客户内部名称: {clientInsideName}", f"商务渠道: {managerArea}")
pretty_print2(f"承载平台: {managerVendor}", f"客户类型: {accountType}")
break
def domain_info_3(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历重叠域名使用request id确定唯一的信息
for domain_info in domain_infos['data']['results']:
if domain_info['accountId'] == inp_accid:
statusName = domain_info['statusName']
ipv6Switch = domain_info['ipv6Switch']
productName = domain_info['productName']
innerTestDomain = domain_info['innerTestDomain']
ipv6Switch = '' if ipv6Switch == 1 else ''
innerTestDomain = '' if innerTestDomain == 1 else ''
pretty_print2(f"域名状态: {statusName}", f"是否开启IPv6: {ipv6Switch}")
pretty_print2(f"是否内部测试域名: {innerTestDomain}", f"产品类型: {productName}")
break
def domain_info_4(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
if len(domain_infos['result']) == 0:
sys.exit(204)
else:
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
# 1. 回源地址
origin = []
for ori in domain_info['origin']:
origin.append(ori['role'] + ':' + ori['origin'])
# 2. 访问协议 + 端口
http_visit = domain_info['basic_conf']['http_server_port'] if domain_info['http_status'] == 'on' else 'X'
https_visit = domain_info['basic_conf']['https_server_port'] if domain_info['https_status'] == 'on' else 'X'
url_visit = str(http_visit) + '/' + str(https_visit)
# 3. 回源协议 + 端口
https_origin = str(domain_info['basic_conf']['https_origin_port'])
http_origin = str(domain_info['basic_conf']['http_origin_port'])
if domain_info['backorigin_protocol'] == 'follow_request':
url_origin = http_origin + '/' + https_origin
elif domain_info['backorigin_protocol'] == 'http':
url_origin = http_origin + '/X'
elif domain_info['backorigin_protocol'] == 'https':
url_origin = 'X/' + https_origin
else:
print("回源协议除了http/https/follow_request之外还有第四种方式请补充...")
sys.exit(201)
# 4. 证书备注名
cert_name = domain_info['cert_name']
# 6. 预部署资源池
pre_node_list = domain_info['pre_node_list']
off_pool = get_respool("respool.log", pre_node_list, '预部署资源池')
# 7. 全局资源池
node_list = domain_info['node_list']
on_pool = get_respool("respool.log", node_list, '全局资源池')
# 8. 是否热加载
conf_order_id = domain_info['conf_order_id']
conf_order_id = '' if conf_order_id == -1 else ''
pretty_print2(f"证书备注名: {cert_name}", f"热加载: {conf_order_id}")
pretty_print2(off_pool, on_pool)
print(f"回源地址: {origin}")
print(f"http/https访问: {url_visit}")
print(f"http/https回源: {url_origin}")
# 5. 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent("parent.log", parent_id)
break
def domain_info_5(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"4:{domain_info['domain_id']}\n")
break
# 如下accid没用到
def domain_info_6(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_info=json.loads(obj_domain_info.read())['result']
# 推拉流模式
push_stream_domain = ''
pull_stream_mode = domain_info['base_conf']['pull_stream_mode']
if pull_stream_mode == 0:
pull_stream_mode = "直播拉流(推拉流)"
push_stream_domain = domain_info['base_conf']['push_stream_domain']
elif pull_stream_mode == 1:
pull_stream_mode = "直播拉流(回源拉流)"
else:
pull_stream_mode = "直播推流"
# 证书备注名
if domain_info['protocol_control']['https_switch'] == 1:
cert_name = domain_info['protocol_control']['cert_name']
else:
cert_name = '无绑定证书'
pretty_print3(f"推拉流模式: {pull_stream_mode}", f"推流域名: {push_stream_domain}", f"证书备注名: {cert_name}")
# 预部署资源池
pre_node_list = domain_info['pre_resouce_id']
off_pool = get_respool("respool.log", pre_node_list, '预部署资源池')
# 全局资源池
node_list = domain_info['resouce_id']
on_pool = get_respool("respool.log", node_list, '全局资源池')
pretty_print2(off_pool, on_pool)
# 回源模式
origin_mode = domain_info['base_conf']['origin_mode']
for mode in origin_mode:
print(f"回源模式: {mode}")
mode_desc = domain_info['base_conf'][f'{mode}_origin']
for ori in mode_desc:
for k, v in ori.items():
if v != '':
print(f"{k}: {v}")
# 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent("parent.log", parent_id)
def domain_map_info(domain_map_log, flg):
with open(domain_map_log) as obj_domain_map_log:
map_info=json.loads(obj_domain_map_log.read())
# 判断是否是重叠域名
parse_detail=map_info['parse_detail']
if int(flg) == 0:
print('------------------------------分区域解析------------------------------')
for item in parse_detail:
pretty_print3(item['area_cnname'], item['type'], item['value'], 1)
# write to file here
print('----------------------------------------------------------------------')
else:
with open('map.log', 'w') as obj_map_log:
for item in parse_detail:
obj_map_log.write(f"{item['value']}\n")
def map_info(map_info_log, inp_domain):
with open(map_info_log) as obj_map_info:
map_infos=json.loads(obj_map_info.read())
# 判断是否是重叠域名
multi = len(map_infos['data'])
if multi == 0:
print(f"{c_br}未找到该域名相关信息,可以登录网页系统查看是否有配置解析组,退出...{c_e}")
sys.exit(205)
inp_index = 1
if multi > 1:
print(f"{c_bp}该域名是重叠域名,请确认要查询域名的归属账号: {c_e}")
index = 1
flag = 0
# 遍历重叠域名的账号邮箱,需要输入确定的序号
for map_info in map_infos['data']:
print(f"账号{index} - ", end="")
for find_it in map_info['domains']:
if find_it['domain'] == inp_domain:
pretty_print3(f"账户: {find_it['account_name']}", f"邮箱: {find_it['email']}", f"accid: {find_it['account_id']}")
flag = 0
break
flag = 1
if flag == 1:
print()
flag = 0
index += 1
print(f"{c_by}请输入要查询域名归属账号的序号(e.g. 1, 2, 3...): {c_e}")
# 验证index是合法输入的逻辑
inp_index = input()
if inp_index.isdigit() and 1 <= int(inp_index) and int(inp_index) < index:
inp_index = int(inp_index)
else:
print(f"{c_br}请输入正确的序号,{c_e}", end="")
sys.exit(200)
inp_index -= 1
inp_index = inp_index if inp_index != 0 else 0
parse_group = map_infos['data'][inp_index]['parse_group_name']
common_cname = len(map_infos['data'][inp_index]['domains'])
for find_it in range(common_cname):
if map_infos['data'][inp_index]['domains'][find_it]['domain'] == inp_domain:
break
account_id = map_infos['data'][inp_index]['domains'][find_it]['account_id']
access_id = map_infos['data'][inp_index]['domains'][find_it]['access_id']
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"3:{account_id}\n")
obj_info.write(f"4:{access_id}\n")
if parse_group != '':
with open("map.log", 'w', encoding='utf-8') as obj_map:
obj_map.write(f"{parse_group}\n")
else:
sys.exit(201)
def domain_config_cdn(domain_info_log, inp_accid, domain):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
if len(domain_infos['result']) == 0:
sys.exit(204)
else:
for domain_info in domain_infos['result']:
config_json = json.dumps(domain_info)
os.environ['config_json'] = config_json
os.environ['domain_json'] = domain
if domain_info['account_id'] == inp_accid:
os.system("echo $config_json > $domain_json")
break
def domain_config_live(domain_info_log, domain):
with open(domain_info_log) as obj_domain_info:
domain_info=json.loads(obj_domain_info.read())['result']
config_json = json.dumps(domain_info)
os.environ['config_json'] = config_json
os.environ['domain_json'] = domain
os.system("echo $config_json > $domain_json")
def parent_info_4(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
if len(domain_infos['result']) == 0:
sys.exit(204)
else:
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
# 5. 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent_info("parent.log", parent_id)
break
def parent_info_5(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"2:{domain_info['domain_id']}\n")
break
# 如下accid没用到
def parent_info_6(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_info=json.loads(obj_domain_info.read())['result']
# 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent_info("parent.log", parent_id)
def get_parent_info(parent_log, inp_parent_id):
parent_all = ["dyn_first_parent", "dyn_first_parent_all", "dyn_first_parent_backups", "dyn_second_parent", "first_parent", "first_parent_backups", "pre_first_parent", "pre_first_parent_backups", "pre_second_parent", "pre_second_parent_backups", "second_parent", "second_parent_backups"]
parent_related = {}
with open(parent_log) as obj_parent:
parents=json.loads(obj_parent.read())
for parent in parents['result']:
if parent['parent_id'] == inp_parent_id:
parent_name = parent['parent_name']
index = 1
for parent_en in parent_all:
if parent[parent_en] != '':
with open("cmap", 'a', encoding='utf-8') as obj_cmap:
obj_cmap.write(f"{index}. {parent[parent_en]}\n")
index += 1
break
def quit(signum, frame):
print("Bye!")
sys.exit(205)
def pretty_print2(col_1, col_2):
len_1 = len(col_1)
len_2 = len(col_2)
len_1_utf8 = len(col_1.encode('utf-8'))
len_2_utf8 = len(col_2.encode('utf-8'))
size_1 = 48 - int((len_1_utf8 - len_1) / 2)
size_2 = 40 - int((len_2_utf8 - len_2) / 2)
print(f"%-{size_1}s%-{size_2}s" % (col_1, col_2))
def pretty_print3(col_1, col_2, col_3, col_4=0):
len_1 = len(col_1)
len_2 = len(col_2)
len_3 = len(col_3)
len_1_utf8 = len(col_1.encode('utf-8'))
len_2_utf8 = len(col_2.encode('utf-8'))
len_3_utf8 = len(col_3.encode('utf-8'))
size_1 = 48- int((len_1_utf8 - len_1) / 2)
size_2 = 40 - int((len_2_utf8 - len_2) / 2)
size_3 = 30 - int((len_2_utf8 - len_2) / 2)
if col_4 == 0:
print(f"%-{size_1}s%-{size_2}s%-{size_3}s" % (col_1, col_2, col_3))
else:
size_1 = 16- int((len_1_utf8 - len_1) / 2)
size_2 = 10 - int((len_2_utf8 - len_2) / 2)
size_3 = 60 - int((len_2_utf8 - len_2) / 2)
print(f"%-{size_1}s%-{size_2}s%-{size_3}s" % (col_1, col_2, col_3))
def pretty_print_data(width: list, cols: list):
for i in range(len(cols)):
len_text = len(cols[i])
len_utf8 = len(cols[i].encode('utf-8'))
len_size = width[i] - int((len_utf8 - len_text) / 2)
if i == 8:
if float(cols[i]) < 10:
print(f"{c_br}%-{len_size}s{c_e}" % (cols[i]), end='')
elif float(cols[i]) < 30:
print(f"{c_by}%-{len_size}s{c_e}" % (cols[i]), end='')
else:
print(f"{c_bg}%-{len_size}s{c_e}" % (cols[i]), end='')
else:
print(f"%-{len_size}s" % (cols[i]), end='')
print()
def pretty_print_title(width: list, cols: list):
for i in range(len(cols)):
len_text = len(cols[i])
len_utf8 = len(cols[i].encode('utf-8'))
len_size = width[i] - int((len_utf8 - len_text) / 2)
print(f"{c_title}%-{len_size}s{c_e}" % (cols[i]), end='')
print()
def fmt_print_global(res_map):
title = ["大区", "省份", "View", "组名", "VIP", "LAKE", "压测带宽", "实时带宽", "冗余带宽", "昨晚高峰", "昨中高峰"]
width = [5, 18, 25, 25, 32, 15, 10, 10, 10, 10, 10]
pretty_print_title(width, title)
with open(res_map) as obj_res_map:
lines = obj_res_map.readlines()
count = 1
for line in lines:
pretty_print_data(width, line.strip().split())
count += 1
if count % 25 == 0:
pretty_print_title(width, title)
def fmt_print_partial(res_map, view, query, domain, domain_map):
if os.path.getsize(view):
title = ["大区", "省份", "View", "组名", "VIP", "LAKE", "压测带宽", "实时带宽", "冗余带宽", "昨晚高峰", "昨中高峰"]
width = [5, 18, 25, 25, 32, 15, 10, 10, 10, 10, 10]
pretty_print_title(width, title)
with open(res_map) as obj_res_map, open(view) as obj_view:
views = obj_view.readlines()
lines = obj_res_map.readlines()
count = 1
for view_s in views:
for line in lines:
c_line = line.strip().split()
if c_line[2] == view_s.strip():
pretty_print_data(width, c_line)
count += 1
if count % 25 == 0:
pretty_print_title(width, title)
if count == 1:
print(f"{c_br}域名{domain}的解析组{domain_map}中,不存在{query}地区的覆盖节点,请确认。{c_e}\n")
sys.exit(206)
else:
print(f"{c_br}请按照规则,输入正确的查询条件,退出...{c_e}")
sys.exit(202)
def main():
option = sys.argv[1]
if option == '--domain_info_1':
domain_info_log = sys.argv[2]
inp_domain = sys.argv[3]
domain_info_1(domain_info_log, inp_domain)
elif option == '--domain_info_2':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_2(domain_info_log, inp_accid)
elif option == '--domain_info_3':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_3(domain_info_log, inp_accid)
elif option == '--domain_info_4':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_4(domain_info_log, inp_accid)
elif option == '--domain_info_5':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_5(domain_info_log, inp_accid)
elif option == '--domain_info_6':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_6(domain_info_log, inp_accid)
elif option == '--domain_map_info':
domain_map_log = sys.argv[2]
flg = sys.argv[3]
domain_map_info(domain_map_log, flg)
elif option == '--map_info':
map_info_log = sys.argv[2]
inp_accid = sys.argv[3]
map_info(map_info_log, inp_accid)
elif option == '--format-global':
res_map = sys.argv[2]
fmt_print_global(res_map)
elif option == '--format-partial':
query = sys.argv[2]
view = sys.argv[3]
res_map = sys.argv[4]
domain = sys.argv[5]
domain_map = sys.argv[6]
fmt_print_partial(res_map, view, query, domain, domain_map)
elif option == '--domain_config_cdn':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain = sys.argv[4]
domain_config_cdn(domain_info_log, inp_accid, domain)
elif option == '--domain_config_live':
domain_info_log = sys.argv[2]
domain = sys.argv[3]
domain_config_live(domain_info_log, domain)
elif option == '--parent_info_4':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
parent_info_4(domain_info_log, inp_accid)
elif option == '--parent_info_5':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
parent_info_5(domain_info_log, inp_accid)
elif option == '--parent_info_6':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
parent_info_6(domain_info_log, inp_accid)
if __name__ == "__main__":
signal.signal(signal.SIGINT, quit)
main()