监控抖音用户最新作品更新可以微信公众号提醒WxPusher消息推送,要12个小时监控抖音用户有没有更新最新视频,批量监控抖音用户,代码要可以运行结果可以来
25条回答 默认 最新
关注 获得2.00元问题酬金 让阿豪来帮你解答,本回答参考chatgpt3.5编写提供,如果还有疑问可以评论或留言
解答: 问题分析:- 需要通过抖音接口获取用户最新作品的信息;
- 监控需要定时执行,可以使用Python的定时任务模块进行实现,如APScheduler或schedule;
- 需要将监控结果推送到微信公众号上,可以使用WxPusher消息推送服务;
- 批量监控抖音用户,需要从配置文件或数据库中获取需要监控的用户列表; 解决方案:
-
使用Python的requests模块访问抖音接口获取用户作品信息:
import requests import json def get_user_latest_video(user_id): url = 'https://www.iesdouyin.com/web/api/v2/aweme/post/' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} params = { 'user_id': user_id, 'sec_uid': '', 'count': '1', 'max_cursor': '0', 'aid': '1128', '_signature': 'xxxxx' } response = requests.get(url, params=params, headers=headers) if response.status_code == 200: data = json.loads(response.text) if data['status_code'] == 0: return data['aweme_list'][0] return None
-
使用schedule模块实现定时任务:
import schedule import time def job(): # 获取需要监控的用户列表 user_list = get_user_list() for user_id in user_list: latest_video = get_user_latest_video(user_id) if latest_video: # 最新作品不为空,进行处理 handle_latest_video(latest_video) # 休眠5分钟 time.sleep(5 * 60) schedule.every(12).hours.do(job) while True: schedule.run_pending() time.sleep(1)
-
使用WxPusher进行消息推送:
import requests import json def push_wechat_message(title, content): token = 'xxxxx' url = 'https://wxpusher.zjiecode.com/api/send/message' data = { 'appToken': token, 'content': content, 'contentType': 1, 'topicIds': [], 'uids': ['UID_xxxxx'], 'url': '', 'urlTitle': '', 'miniProgram': {'appid': '', 'pagepath': ''}, 'type': 2, 'title': title } requests.post(url, data=json.dumps(data), headers={'Content-Type': 'application/json'})
-
从配置文件中获取需要监控的用户列表:
import configparser def get_user_list(): cfg = configparser.ConfigParser() cfg.read('users.ini') user_list = cfg.get('users', 'user_list').split(',') return user_list
完整代码示例:
import requests import json import schedule import time import configparser # 从配置文件中获取需要监控的用户列表 def get_user_list(): cfg = configparser.ConfigParser() cfg.read('users.ini') user_list = cfg.get('users', 'user_list').split(',') return user_list # 使用requests模块获取用户最新作品信息 def get_user_latest_video(user_id): url = 'https://www.iesdouyin.com/web/api/v2/aweme/post/' headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'} params = { 'user_id': user_id, 'sec_uid': '', 'count': '1', 'max_cursor': '0', 'aid': '1128', '_signature': 'xxxxx' } response = requests.get(url, params=params, headers=headers) if response.status_code == 200: data = json.loads(response.text) if data['status_code'] == 0: return data['aweme_list'][0] return None # 使用WxPusher进行消息推送 def push_wechat_message(title, content): token = 'xxxxx' url = 'https://wxpusher.zjiecode.com/api/send/message' data = { 'appToken': token, 'content': content, 'contentType': 1, 'topicIds': [], 'uids': ['UID_xxxxx'], 'url': '', 'urlTitle': '', 'miniProgram': {'appid': '', 'pagepath': ''}, 'type': 2, 'title': title } requests.post(url, data=json.dumps(data), headers={'Content-Type': 'application/json'}) # 处理最新作品信息 def handle_latest_video(video): # 处理逻辑 title = video['desc'] content = '最新作品链接:{}'.format(video['share_url']) push_wechat_message(title, content) # 定时任务 def job(): # 获取需要监控的用户列表 user_list = get_user_list() for user_id in user_list: latest_video = get_user_latest_video(user_id) if latest_video: # 最新作品不为空,进行处理 handle_latest_video(latest_video) # 休眠5分钟 time.sleep(5 * 60) # 每12个小时执行一次监控任务 schedule.every(12).hours.do(job) # 循环执行定时任务 while True: schedule.run_pending() time.sleep(1)
解决 无用评论 打赏 举报
悬赏问题
- ¥15 哪个tomcat中startup一直一闪而过 找不出问题
- ¥15 这个怎么改成直流激励源给加热电阻提供5a电流呀
- ¥50 求解vmware的网络模式问题 别拿AI回答
- ¥24 EFS加密后,在同一台电脑解密出错,证书界面找不到对应指纹的证书,未备份证书,求在原电脑解密的方法,可行即采纳
- ¥15 springboot 3.0 实现Security 6.x版本集成
- ¥15 PHP-8.1 镜像无法用dockerfile里的CMD命令启动 只能进入容器启动,如何解决?(操作系统-ubuntu)
- ¥30 请帮我解决一下下面六个代码
- ¥15 关于资源监视工具的e-care有知道的嘛
- ¥35 MIMO天线稀疏阵列排布问题
- ¥60 用visual studio编写程序,利用间接平差求解水准网