[init] initial commit

This commit is contained in:
2023-06-05 23:04:30 +08:00
commit 66b1dd4d70
72 changed files with 10079 additions and 0 deletions

603
old/ctc/get_infos.py Normal file
View File

@ -0,0 +1,603 @@
#-*- coding:utf-8 -*-
#!/usr/bin/env python3
import json
import sys
import signal
import readline
import os
c_title = '\033[1;4;31;42m' # title color
c_br = '\033[1;31m' # bold red
c_bg = '\033[1;32m' # bold green
c_by = '\033[1;33m' # bold yellow
c_bb = '\033[1;34m' # bold blue
c_bp = '\033[1;35m' # bold purple
c_bc = '\033[1;36m' # bold cyan
c_bir= '\033[1;3;31m' # * bold italic red
c_bib = '\033[1;3;34m' # * bold italic cyan
c_bic = '\033[1;3;36m' # bold italic cyan
c_e = '\033[0m' # reset
def get_parent(parent_log, inp_parent_id):
parent_all = {"dyn_first_parent": "动态一层父", "dyn_first_parent_all": "动态一层父所有", "dyn_first_parent_backups": "动态一层备父", "dyn_second_parent": "动态二层父", "first_parent": "一层父", "first_parent_backups": "一层备父", "pre_first_parent": "预部署一层父", "pre_first_parent_backups": "预部署一层备父", "pre_second_parent": "预部署二层父", "pre_second_parent_backups": "预部署二层备父", "second_parent": "二层父", "second_parent_backups": "二层备父"}
parent_related = {}
with open(parent_log) as obj_parent:
parents=json.loads(obj_parent.read())
for parent in parents['result']:
if parent['parent_id'] == inp_parent_id:
parent_name = parent['parent_name']
print(f"父方案: {parent_name}")
for parent_en, parent_cn in parent_all.items():
if parent[parent_en] != '':
parent_related[parent[parent_en]] = parent_cn
for parent_en, parent_cn in parent_related.items():
print(f"{parent_cn}: {parent_en}")
break
def get_respool(respool_log, inp_template_id, pool_type):
with open(respool_log) as obj_respool:
respools=json.loads(obj_respool.read())
for respool in respools['result']:
if int(respool['template_id']) == int(inp_template_id):
# print(f"{pool_type}: {respool['template_name']}")
return (f"{pool_type}: {respool['template_name']}")
def domain_info_1(domain_info_log, inp_domain):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 判断是否是重叠域名
multi = len(domain_infos['data'])
if multi == 0:
print(f"{c_br}未找到该域名相关信息,可以登录网页系统查看是否有配置解析组,退出...{c_e}")
sys.exit(205)
overlap = "" if multi > 1 else ""
inp_index = 1
if multi > 1:
print(f"{c_bp}该域名是重叠域名,请确认要查询域名的归属账号: {c_e}")
index = 1
flag = 0
# 遍历重叠域名的账号邮箱,需要输入确定的序号
for domain_info in domain_infos['data']:
print(f"账号{index} - ", end="")
for find_it in domain_info['domains']:
if find_it['domain'] == inp_domain:
pretty_print3(f"账户: {find_it['account_name']}", f"邮箱: {find_it['email']}", f"accid: {find_it['account_id']}")
flag = 0
break
flag = 1
if flag == 1:
print()
flag = 0
index += 1
print(f"{c_by}请输入要查询域名归属账号的序号(e.g. 1, 2, 3...): {c_e}")
# 验证index是合法输入的逻辑
inp_index = input()
if inp_index.isdigit() and 1 <= int(inp_index) and int(inp_index) < index:
inp_index = int(inp_index)
else:
print(f"{c_br}请输入正确的序号,{c_e}", end="")
sys.exit(200)
inp_index -= 1
inp_index = inp_index if inp_index != 0 else 0
common_cname = len(domain_infos['data'][inp_index]['domains'])
for find_it in range(common_cname):
if domain_infos['data'][inp_index]['domains'][find_it]['domain'] == inp_domain:
break
common_cname = '' if common_cname > 1 else ''
common_cnames = []
for domain in domain_infos['data'][inp_index]['domains']:
common_cnames.append(domain['domain'])
account = domain_infos['data'][inp_index]['domains'][find_it]['account_name']
account_id = domain_infos['data'][inp_index]['domains'][find_it]['account_id']
access_id = domain_infos['data'][inp_index]['domains'][find_it]['access_id']
email = domain_infos['data'][inp_index]['domains'][find_it]['email']
cname = domain_infos['data'][inp_index]['cname']
cname_vendor = domain_infos['data'][inp_index]['access_vendor_cname']
parse_group = domain_infos['data'][inp_index]['parse_group_name']
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"1:{account}\n")
obj_info.write(f"2:{email}\n")
obj_info.write(f"3:{account_id}\n")
obj_info.write(f"4:{access_id}\n")
pretty_print3(f"账户: {account}", f"邮箱: {email}", f"accId: {account_id}")
pretty_print3(f"Map: {parse_group}", f"accessId: {access_id}", f"重叠域名: {overlap}")
pretty_print3(f"合作方: {cname_vendor}", f"CNAME: {cname}", f"是否共享CNAME缓存: {common_cname}")
if common_cname == '':
print(f"共享CNAME缓存域名列表: {common_cnames}")
if parse_group == '':
sys.exit(201)
def domain_info_2(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
for acc_info in domain_infos['results']['items']:
if acc_info['accountId'] == inp_accid:
managerArea = acc_info['managerArea']
platformVipLevel = acc_info['platformVipLevel']
businessLevel = acc_info['businessLevel']
ctYunVipLevel = acc_info['ctYunVipLevel']
clientId = acc_info['clientId']
accountType = acc_info['accountType']
clientInsideName = acc_info['clientInsideName']
maintainAfterName = acc_info['maintainAfterName']
maintainAfterPhone = acc_info['maintainAfterPhone']
maintainAfterEmail = acc_info['maintainAfterEmail']
managerVendor = acc_info['managerVendor']
pretty_print3(f"售后姓名: {maintainAfterName}", f"售后电话: {maintainAfterPhone}", f"售后邮箱: {maintainAfterEmail}")
pretty_print3(f"天翼云VIP等级: {ctYunVipLevel}", f"平台VIP等级: {platformVipLevel}", f"客户VIP等级: {businessLevel}")
pretty_print3(f"clientId: {clientId}", f"客户内部名称: {clientInsideName}", f"商务渠道: {managerArea}")
pretty_print2(f"承载平台: {managerVendor}", f"客户类型: {accountType}")
break
def domain_info_3(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历重叠域名使用request id确定唯一的信息
for domain_info in domain_infos['data']['results']:
if domain_info['accountId'] == inp_accid:
statusName = domain_info['statusName']
ipv6Switch = domain_info['ipv6Switch']
productName = domain_info['productName']
innerTestDomain = domain_info['innerTestDomain']
ipv6Switch = '' if ipv6Switch == 1 else ''
innerTestDomain = '' if innerTestDomain == 1 else ''
pretty_print2(f"域名状态: {statusName}", f"是否开启IPv6: {ipv6Switch}")
pretty_print2(f"是否内部测试域名: {innerTestDomain}", f"产品类型: {productName}")
break
def domain_info_4(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
if len(domain_infos['result']) == 0:
sys.exit(204)
else:
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
# 1. 回源地址
origin = []
for ori in domain_info['origin']:
origin.append(ori['role'] + ':' + ori['origin'])
# 2. 访问协议 + 端口
http_visit = domain_info['basic_conf']['http_server_port'] if domain_info['http_status'] == 'on' else 'X'
https_visit = domain_info['basic_conf']['https_server_port'] if domain_info['https_status'] == 'on' else 'X'
url_visit = str(http_visit) + '/' + str(https_visit)
# 3. 回源协议 + 端口
https_origin = str(domain_info['basic_conf']['https_origin_port'])
http_origin = str(domain_info['basic_conf']['http_origin_port'])
if domain_info['backorigin_protocol'] == 'follow_request':
url_origin = http_origin + '/' + https_origin
elif domain_info['backorigin_protocol'] == 'http':
url_origin = http_origin + '/X'
elif domain_info['backorigin_protocol'] == 'https':
url_origin = 'X/' + https_origin
else:
print("回源协议除了http/https/follow_request之外还有第四种方式请补充...")
sys.exit(201)
# 4. 证书备注名
cert_name = domain_info['cert_name']
# 6. 预部署资源池
pre_node_list = domain_info['pre_node_list']
off_pool = get_respool("respool.log", pre_node_list, '预部署资源池')
# 7. 全局资源池
node_list = domain_info['node_list']
on_pool = get_respool("respool.log", node_list, '全局资源池')
# 8. 是否热加载
conf_order_id = domain_info['conf_order_id']
conf_order_id = '' if conf_order_id == -1 else ''
pretty_print2(f"证书备注名: {cert_name}", f"热加载: {conf_order_id}")
pretty_print2(off_pool, on_pool)
print(f"回源地址: {origin}")
print(f"http/https访问: {url_visit}")
print(f"http/https回源: {url_origin}")
# 5. 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent("parent.log", parent_id)
break
def domain_info_5(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"4:{domain_info['domain_id']}\n")
break
# 如下accid没用到
def domain_info_6(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_info=json.loads(obj_domain_info.read())['result']
# 推拉流模式
push_stream_domain = ''
pull_stream_mode = domain_info['base_conf']['pull_stream_mode']
if pull_stream_mode == 0:
pull_stream_mode = "直播拉流(推拉流)"
push_stream_domain = domain_info['base_conf']['push_stream_domain']
elif pull_stream_mode == 1:
pull_stream_mode = "直播拉流(回源拉流)"
else:
pull_stream_mode = "直播推流"
# 证书备注名
if domain_info['protocol_control']['https_switch'] == 1:
cert_name = domain_info['protocol_control']['cert_name']
else:
cert_name = '无绑定证书'
pretty_print3(f"推拉流模式: {pull_stream_mode}", f"推流域名: {push_stream_domain}", f"证书备注名: {cert_name}")
# 预部署资源池
pre_node_list = domain_info['pre_resouce_id']
off_pool = get_respool("respool.log", pre_node_list, '预部署资源池')
# 全局资源池
node_list = domain_info['resouce_id']
on_pool = get_respool("respool.log", node_list, '全局资源池')
pretty_print2(off_pool, on_pool)
# 回源模式
origin_mode = domain_info['base_conf']['origin_mode']
for mode in origin_mode:
print(f"回源模式: {mode}")
mode_desc = domain_info['base_conf'][f'{mode}_origin']
for ori in mode_desc:
for k, v in ori.items():
if v != '':
print(f"{k}: {v}")
# 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent("parent.log", parent_id)
def domain_map_info(domain_map_log, flg):
with open(domain_map_log) as obj_domain_map_log:
map_info=json.loads(obj_domain_map_log.read())
# 判断是否是重叠域名
parse_detail=map_info['parse_detail']
if int(flg) == 0:
print('------------------------------分区域解析------------------------------')
for item in parse_detail:
pretty_print3(item['area_cnname'], item['type'], item['value'], 1)
# write to file here
print('----------------------------------------------------------------------')
else:
with open('map.log', 'w') as obj_map_log:
for item in parse_detail:
obj_map_log.write(f"{item['value']}\n")
def map_info(map_info_log, inp_domain):
with open(map_info_log) as obj_map_info:
map_infos=json.loads(obj_map_info.read())
# 判断是否是重叠域名
multi = len(map_infos['data'])
if multi == 0:
print(f"{c_br}未找到该域名相关信息,可以登录网页系统查看是否有配置解析组,退出...{c_e}")
sys.exit(205)
inp_index = 1
if multi > 1:
print(f"{c_bp}该域名是重叠域名,请确认要查询域名的归属账号: {c_e}")
index = 1
flag = 0
# 遍历重叠域名的账号邮箱,需要输入确定的序号
for map_info in map_infos['data']:
print(f"账号{index} - ", end="")
for find_it in map_info['domains']:
if find_it['domain'] == inp_domain:
pretty_print3(f"账户: {find_it['account_name']}", f"邮箱: {find_it['email']}", f"accid: {find_it['account_id']}")
flag = 0
break
flag = 1
if flag == 1:
print()
flag = 0
index += 1
print(f"{c_by}请输入要查询域名归属账号的序号(e.g. 1, 2, 3...): {c_e}")
# 验证index是合法输入的逻辑
inp_index = input()
if inp_index.isdigit() and 1 <= int(inp_index) and int(inp_index) < index:
inp_index = int(inp_index)
else:
print(f"{c_br}请输入正确的序号,{c_e}", end="")
sys.exit(200)
inp_index -= 1
inp_index = inp_index if inp_index != 0 else 0
parse_group = map_infos['data'][inp_index]['parse_group_name']
common_cname = len(map_infos['data'][inp_index]['domains'])
for find_it in range(common_cname):
if map_infos['data'][inp_index]['domains'][find_it]['domain'] == inp_domain:
break
account_id = map_infos['data'][inp_index]['domains'][find_it]['account_id']
access_id = map_infos['data'][inp_index]['domains'][find_it]['access_id']
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"3:{account_id}\n")
obj_info.write(f"4:{access_id}\n")
if parse_group != '':
with open("map.log", 'w', encoding='utf-8') as obj_map:
obj_map.write(f"{parse_group}\n")
else:
sys.exit(201)
def domain_config_cdn(domain_info_log, inp_accid, domain):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
if len(domain_infos['result']) == 0:
sys.exit(204)
else:
for domain_info in domain_infos['result']:
config_json = json.dumps(domain_info)
os.environ['config_json'] = config_json
os.environ['domain_json'] = domain
if domain_info['account_id'] == inp_accid:
os.system("echo $config_json > $domain_json")
break
def domain_config_live(domain_info_log, domain):
with open(domain_info_log) as obj_domain_info:
domain_info=json.loads(obj_domain_info.read())['result']
config_json = json.dumps(domain_info)
os.environ['config_json'] = config_json
os.environ['domain_json'] = domain
os.system("echo $config_json > $domain_json")
def parent_info_4(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
if len(domain_infos['result']) == 0:
sys.exit(204)
else:
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
# 5. 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent_info("parent.log", parent_id)
break
def parent_info_5(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_infos=json.loads(obj_domain_info.read())
# 遍历账号名称相同的客户使用request id确定唯一的信息
for domain_info in domain_infos['result']:
if domain_info['account_id'] == inp_accid:
with open("info.log", 'w', encoding='utf-8') as obj_info:
obj_info.write(f"2:{domain_info['domain_id']}\n")
break
# 如下accid没用到
def parent_info_6(domain_info_log, inp_accid):
with open(domain_info_log) as obj_domain_info:
domain_info=json.loads(obj_domain_info.read())['result']
# 父方案 parent_id
parent_id = domain_info['parent_id']
get_parent_info("parent.log", parent_id)
def get_parent_info(parent_log, inp_parent_id):
parent_all = ["dyn_first_parent", "dyn_first_parent_all", "dyn_first_parent_backups", "dyn_second_parent", "first_parent", "first_parent_backups", "pre_first_parent", "pre_first_parent_backups", "pre_second_parent", "pre_second_parent_backups", "second_parent", "second_parent_backups"]
parent_related = {}
with open(parent_log) as obj_parent:
parents=json.loads(obj_parent.read())
for parent in parents['result']:
if parent['parent_id'] == inp_parent_id:
parent_name = parent['parent_name']
index = 1
for parent_en in parent_all:
if parent[parent_en] != '':
with open("cmap", 'a', encoding='utf-8') as obj_cmap:
obj_cmap.write(f"{index}. {parent[parent_en]}\n")
index += 1
break
def quit(signum, frame):
print("Bye!")
sys.exit(205)
def pretty_print2(col_1, col_2):
len_1 = len(col_1)
len_2 = len(col_2)
len_1_utf8 = len(col_1.encode('utf-8'))
len_2_utf8 = len(col_2.encode('utf-8'))
size_1 = 48 - int((len_1_utf8 - len_1) / 2)
size_2 = 40 - int((len_2_utf8 - len_2) / 2)
print(f"%-{size_1}s%-{size_2}s" % (col_1, col_2))
def pretty_print3(col_1, col_2, col_3, col_4=0):
len_1 = len(col_1)
len_2 = len(col_2)
len_3 = len(col_3)
len_1_utf8 = len(col_1.encode('utf-8'))
len_2_utf8 = len(col_2.encode('utf-8'))
len_3_utf8 = len(col_3.encode('utf-8'))
size_1 = 48- int((len_1_utf8 - len_1) / 2)
size_2 = 40 - int((len_2_utf8 - len_2) / 2)
size_3 = 30 - int((len_2_utf8 - len_2) / 2)
if col_4 == 0:
print(f"%-{size_1}s%-{size_2}s%-{size_3}s" % (col_1, col_2, col_3))
else:
size_1 = 16- int((len_1_utf8 - len_1) / 2)
size_2 = 10 - int((len_2_utf8 - len_2) / 2)
size_3 = 60 - int((len_2_utf8 - len_2) / 2)
print(f"%-{size_1}s%-{size_2}s%-{size_3}s" % (col_1, col_2, col_3))
def pretty_print_data(width: list, cols: list):
for i in range(len(cols)):
len_text = len(cols[i])
len_utf8 = len(cols[i].encode('utf-8'))
len_size = width[i] - int((len_utf8 - len_text) / 2)
if i == 8:
if float(cols[i]) < 10:
print(f"{c_br}%-{len_size}s{c_e}" % (cols[i]), end='')
elif float(cols[i]) < 30:
print(f"{c_by}%-{len_size}s{c_e}" % (cols[i]), end='')
else:
print(f"{c_bg}%-{len_size}s{c_e}" % (cols[i]), end='')
else:
print(f"%-{len_size}s" % (cols[i]), end='')
print()
def pretty_print_title(width: list, cols: list):
for i in range(len(cols)):
len_text = len(cols[i])
len_utf8 = len(cols[i].encode('utf-8'))
len_size = width[i] - int((len_utf8 - len_text) / 2)
print(f"{c_title}%-{len_size}s{c_e}" % (cols[i]), end='')
print()
def fmt_print_global(res_map):
title = ["大区", "省份", "View", "组名", "VIP", "LAKE", "压测带宽", "实时带宽", "冗余带宽", "昨晚高峰", "昨中高峰"]
width = [5, 18, 25, 25, 32, 15, 10, 10, 10, 10, 10]
pretty_print_title(width, title)
with open(res_map) as obj_res_map:
lines = obj_res_map.readlines()
count = 1
for line in lines:
pretty_print_data(width, line.strip().split())
count += 1
if count % 25 == 0:
pretty_print_title(width, title)
def fmt_print_partial(res_map, view, query, domain, domain_map):
if os.path.getsize(view):
title = ["大区", "省份", "View", "组名", "VIP", "LAKE", "压测带宽", "实时带宽", "冗余带宽", "昨晚高峰", "昨中高峰"]
width = [5, 18, 25, 25, 32, 15, 10, 10, 10, 10, 10]
pretty_print_title(width, title)
with open(res_map) as obj_res_map, open(view) as obj_view:
views = obj_view.readlines()
lines = obj_res_map.readlines()
count = 1
for view_s in views:
for line in lines:
c_line = line.strip().split()
if c_line[2] == view_s.strip():
pretty_print_data(width, c_line)
count += 1
if count % 25 == 0:
pretty_print_title(width, title)
if count == 1:
print(f"{c_br}域名{domain}的解析组{domain_map}中,不存在{query}地区的覆盖节点,请确认。{c_e}\n")
sys.exit(206)
else:
print(f"{c_br}请按照规则,输入正确的查询条件,退出...{c_e}")
sys.exit(202)
def main():
option = sys.argv[1]
if option == '--domain_info_1':
domain_info_log = sys.argv[2]
inp_domain = sys.argv[3]
domain_info_1(domain_info_log, inp_domain)
elif option == '--domain_info_2':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_2(domain_info_log, inp_accid)
elif option == '--domain_info_3':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_3(domain_info_log, inp_accid)
elif option == '--domain_info_4':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_4(domain_info_log, inp_accid)
elif option == '--domain_info_5':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_5(domain_info_log, inp_accid)
elif option == '--domain_info_6':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain_info_6(domain_info_log, inp_accid)
elif option == '--domain_map_info':
domain_map_log = sys.argv[2]
flg = sys.argv[3]
domain_map_info(domain_map_log, flg)
elif option == '--map_info':
map_info_log = sys.argv[2]
inp_accid = sys.argv[3]
map_info(map_info_log, inp_accid)
elif option == '--format-global':
res_map = sys.argv[2]
fmt_print_global(res_map)
elif option == '--format-partial':
query = sys.argv[2]
view = sys.argv[3]
res_map = sys.argv[4]
domain = sys.argv[5]
domain_map = sys.argv[6]
fmt_print_partial(res_map, view, query, domain, domain_map)
elif option == '--domain_config_cdn':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
domain = sys.argv[4]
domain_config_cdn(domain_info_log, inp_accid, domain)
elif option == '--domain_config_live':
domain_info_log = sys.argv[2]
domain = sys.argv[3]
domain_config_live(domain_info_log, domain)
elif option == '--parent_info_4':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
parent_info_4(domain_info_log, inp_accid)
elif option == '--parent_info_5':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
parent_info_5(domain_info_log, inp_accid)
elif option == '--parent_info_6':
domain_info_log = sys.argv[2]
inp_accid = sys.argv[3]
parent_info_6(domain_info_log, inp_accid)
if __name__ == "__main__":
signal.signal(signal.SIGINT, quit)
main()