scripts/alert/poem_send.py
2023-06-05 23:04:30 +08:00

121 lines
4.2 KiB
Python

import requests
import json
import time
import sys
def send_alert_msg(alert_msg):
"""
sending messages via Enterprise WeChat Bot with the content of a poem from Jinrishici API.
今日诗词:名句
【title】-【author】-【dynasty】
poem of complete
:param alert_msg:content with a specified format
:return: None
"""
# get the datetime, which is using at a failed situation
alert_datetime = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
# Enterprise WeChat Bot API and the format of body to send
hook_url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=ddea3f5f-fbfc-4c21-994a-71e9fc50e4ef'
body = {
"msgtype": "text",
"text": {
"content": alert_msg
}
}
# get the result of API call
res = requests.post(hook_url, data=json.dumps(body, ensure_ascii=False).encode('utf-8'))
# when failed, log it in /opt/logs/alert.log file
if res.status_code != 200:
with open('/opt/logs/alert.log', 'a', encoding='utf-8') as alert_log:
alert_log.write(alert_datetime + ' >>>> ')
alert_log.write('Failed sending message: ')
alert_log.write(alert_msg + '\n')
def poem():
"""
get the poem with Jinrishici API
:return: None
"""
# specify token in headers
headers = {'X-User-Token': 'dNigXSFtjhLbP5nf49piUPzmD7NoNHVz'}
api_url = 'https://v2.jinrishici.com/sentence'
# will try for 3 times, in case there are failed situations to get the poem
for i in range(3):
res = requests.get(api_url, headers=headers)
# translate json data to dict format
dict_substance = json.loads(res.content)
# when failed to get the content, try again
if res.status_code != 200 or dict_substance['status'] != 'success':
continue
# put necessary content to specific variables
# print(dict_substance)
poem_content = dict_substance['data']['content']
poem_title = dict_substance['data']['origin']['title']
poem_dynasty = dict_substance['data']['origin']['dynasty']
poem_author = dict_substance['data']['origin']['author']
poem_content_all = dict_substance['data']['origin']['content']
poem_translation = dict_substance['data']['origin']['translate']
# put poem translation in to the file /opt/scripts/alert/poem_trans.txt, if exists
with open('/opt/scripts/alert/poem_trans.txt', 'w', encoding='utf-8') as obj_poem_trans:
if poem_translation:
for item in poem_translation:
obj_poem_trans.write(item + '\n')
# if the translation part does not exist, put a '' into the file
else:
obj_poem_trans.write('')
# create the format of content which is intended to be send via EWB, aka Enterprise Wechat Bot
alert_msg = f"今日诗词:{poem_content}\n\n"
alert_msg += f"{poem_title}】-【{poem_author}】-【{poem_dynasty}\n"
for line in poem_content_all:
alert_msg += f"{line}\n"
# when successfully get the needed content, jump out of the for-loop
break
# after 3 times re-tries, still cannot get the content, then send the following warning message
else:
alert_msg = '当前无法获取今日诗词,请手动检查如下请求返回是否正确!\n'
alert_msg += 'curl "https://v2.jinrishici.com/sentence" -H "X-User-Token:dNigXSFtjhLbP5nf49piUPzmD7NoNHVz"'
# send it
send_alert_msg(alert_msg)
def trans():
"""
send the translation of the poem which is showed this morning, if exists
:return: None
"""
with open('/opt/scripts/alert/poem_trans.txt', 'r', encoding='utf-8') as obj_poem_trans:
alert_msg = obj_poem_trans.read()
# print(alert_msg)
if alert_msg:
alert_msg = f'今日诗词的译文:\n{alert_msg}'
send_alert_msg(alert_msg)
def main():
"""
do the actions according to different parameters
:return: None
"""
if sys.argv[1] == 'poem':
poem()
elif sys.argv[1] == 'trans':
trans()
else:
pass
if __name__ == '__main__':
main()