我的小程序需要创建1个微信群,方便我和用户快速沟通问题。但是我发现每7天都得换二维码。实在太麻烦了,于是我写了一个python脚本来实现自动获取微信群二维码并且自动更新数据库的微信群图片地址。
自动打开微信,搜索到微信群,获取二维码进行截图
上传截图到文件服务器ftp获取到图片地址
更新数据库中的二维码图片地址
下面是我的备份代码:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
使用ADB+OCR自动化完成以下流程:
1. 打开微信(com.tencent.mm)
2. 进入搜索并输入“A100”
3. 在搜索结果中点击“翰林刷题交流群A100”群聊
4. 进入群资料页面,打开“群二维码”,并截图保存
"""
import time
import subprocess
import time
import xml.etree.ElementTree as ET
from datetime import datetime
from ftplib import FTP
import os
import requests
import pymysql
from utils.config_util import load_config
from utils.adb_utils import ADBUtils
from utils.ocr_utils import OCRUtils
WECHAT_PACKAGE = 'com.tencent.mm'
def _adb_cmd(config):
return ADBUtils.get_adb_command(config)
def _tap(config, x, y):
adb = _adb_cmd(config)
timeout = config['adb']['timeout']
subprocess.run(adb + ['shell', 'input', 'tap', str(x), str(y)],
capture_output=True, text=True, check=True, timeout=timeout)
def _type_text_ascii(config, text):
"""向当前焦点输入框输入ASCII文本。中文直接输入可能失败,这里先输入'A100'做检索。"""
adb = _adb_cmd(config)
timeout = config['adb']['timeout']
subprocess.run(adb + ['shell', 'input', 'text', text],
capture_output=True, text=True, check=True, timeout=timeout)
def _press_enter(config):
adb = _adb_cmd(config)
timeout = config['adb']['timeout']
subprocess.run(adb + ['shell', 'input', 'keyevent', '66'],
capture_output=True, text=True, check=True, timeout=timeout)
def _find_text_center(ocr_result, keywords):
"""在OCR结果中查找包含任一关键字的文本框中心坐标。
返回 (x, y) 或 None
"""
if not ocr_result or not isinstance(ocr_result, dict):
return None
data = ocr_result.get('data')
if not isinstance(data, list):
return None
for item in data:
if not isinstance(item, dict):
continue
txt = item.get('text', '') or ''
if any(k in txt for k in keywords):
box = item.get('box')
if isinstance(box, list) and len(box) >= 4:
x1, y1 = box[0]
x2, y2 = box[1]
x3, y3 = box[2]
x4, y4 = box[3]
cx = int((x1 + x2 + x3 + x4) / 4)
cy = int((y1 + y2 + y3 + y4) / 4)
return (cx, cy)
return None
def _get_screen_size(config):
"""获取屏幕分辨率。"""
try:
adb = _adb_cmd(config)
result = subprocess.run(adb + ['shell', 'wm', 'size'],
capture_output=True, text=True, check=True,
timeout=config['adb']['timeout'])
# 输出格式: Physical size: 1080x2340
output = result.stdout.strip()
if 'x' in output:
size_part = output.split(':')[-1].strip()
width, height = map(int, size_part.split('x'))
return width, height
except Exception as e:
print(f"获取屏幕分辨率失败: {e}")
return 1080, 2340 # 默认分辨率
def _try_click_top_right_button(config):
"""通过UI结构分析找到右上角的按钮并点击。"""
try:
# 获取屏幕分辨率
screen_width, screen_height = _get_screen_size(config)
# 获取UI结构
ui_content = ADBUtils.get_ui_structure(config)
if not ui_content:
return False
import xml.etree.ElementTree as ET
root = ET.fromstring(ui_content)
# 查找右上角的可点击元素
candidates = []
def find_clickable_elements(node):
# 检查是否可点击
clickable = node.get('clickable', 'false') == 'true'
bounds = node.get('bounds', '')
if clickable and bounds:
# 解析bounds: [x1,y1][x2,y2]
import re
match = re.match(r'\[(\d+),(\d+)\]\[(\d+),(\d+)\]', bounds)
if match:
x1, y1, x2, y2 = map(int, match.groups())
# 判断是否在右上角区域 (屏幕宽度的后1/4,高度的前1/4)
right_threshold = screen_width * 0.75
top_threshold = screen_height * 0.25
if x1 > right_threshold and y1 < top_threshold:
candidates.append({
'bounds': (x1, y1, x2, y2),
'center': ((x1 + x2) // 2, (y1 + y2) // 2),
'class': node.get('class', ''),
'resource_id': node.get('resource-id', ''),
'content_desc': node.get('content-desc', '')
})
# 递归查找子元素
for child in node:
find_clickable_elements(child)
find_clickable_elements(root)
# 首先尝试查找已知的resource-id
known_ids = [
'com.tencent.mm:id/right_btn',
'com.tencent.mm:id/more_btn',
'com.tencent.mm:id/action_more',
'com.tencent.mm:id/title_bar_right_btn'
]
for candidate in candidates:
if candidate['resource_id'] in known_ids:
print(f"通过resource-id找到右上角按钮: {candidate['center']}, id: {candidate['resource_id']}")
_tap(config, *candidate['center'])
time.sleep(2)
return True
# 如果没找到已知ID,选择最右上角的元素
if candidates:
# 按x坐标降序,y坐标升序排序,选择最右上角的
candidates.sort(key=lambda c: (-c['center'][0], c['center'][1]))
best_candidate = candidates[0]
print(f"找到右上角按钮: {best_candidate['center']}, class: {best_candidate['class']}")
_tap(config, *best_candidate['center'])
time.sleep(2)
return True
except Exception as e:
print(f"UI结构分析失败: {e}")
return False
def check_wechat_running(config):
"""检查微信是否正在运行。"""
try:
adb = _adb_cmd(config)
result = subprocess.run(adb + ['shell', 'ps', '|', 'grep', WECHAT_PACKAGE],
capture_output=True, text=True, check=True,
timeout=config['adb']['timeout'])
# 如果grep找到了进程,说明微信正在运行
return WECHAT_PACKAGE in result.stdout
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
# 如果命令失败或超时,尝试另一种方法
try:
adb = _adb_cmd(config)
result = subprocess.run(adb + ['shell', 'pidof', WECHAT_PACKAGE],
capture_output=True, text=True, check=True,
timeout=config['adb']['timeout'])
# 如果pidof返回了PID,说明微信正在运行
return result.stdout.strip() != ''
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
return False
def kill_wechat(config):
"""杀死微信进程。"""
try:
adb = _adb_cmd(config)
print('正在杀死微信进程...')
# 方法1:使用am force-stop
subprocess.run(adb + ['shell', 'am', 'force-stop', WECHAT_PACKAGE],
capture_output=True, text=True, check=True,
timeout=config['adb']['timeout'])
# 等待一下确保进程被杀死
time.sleep(2)
# 验证是否成功杀死
if not check_wechat_running(config):
print('✅ 微信进程已成功杀死')
return True
else:
print('⚠️ am force-stop未能完全杀死微信,尝试kill命令...')
# 方法2:使用kill命令
result = subprocess.run(adb + ['shell', 'pidof', WECHAT_PACKAGE],
capture_output=True, text=True, check=True,
timeout=config['adb']['timeout'])
pids = result.stdout.strip().split()
for pid in pids:
if pid.isdigit():
subprocess.run(adb + ['shell', 'kill', pid],
capture_output=True, text=True, check=True,
timeout=config['adb']['timeout'])
time.sleep(2)
if not check_wechat_running(config):
print('✅ 微信进程已通过kill命令杀死')
return True
else:
print('❌ 无法完全杀死微信进程')
return False
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
print(f"杀死微信进程时出错: {e}")
return False
def generate_timestamp_filename():
"""生成基于当前时间的文件名,格式为年月日时分秒.png"""
now = datetime.now()
return now.strftime("%Y%m%d%H%M%S.png")
def upload_to_ftp(local_file_path, ftp_host, ftp_user, ftp_password, remote_dir="wx_group"):
"""
上传文件到FTP服务器
Args:
local_file_path: 本地文件路径
ftp_host: FTP服务器地址
ftp_user: FTP用户名
ftp_password: FTP密码
remote_dir: 远程目录,默认为wx_group
Returns:
tuple: (上传是否成功, 上传后的文件名)
"""
try:
print(f'正在连接FTP服务器 {ftp_host}...')
# 连接FTP服务器
ftp = FTP()
ftp.connect(ftp_host, 21) # 默认FTP端口21
ftp.login(ftp_user, ftp_password)
print('✅ FTP连接成功')
# 切换到远程目录,如果不存在则创建
try:
ftp.cwd(remote_dir)
print(f'✅ 已切换到目录: {remote_dir}')
except Exception:
print(f'目录 {remote_dir} 不存在,正在创建...')
ftp.mkd(remote_dir)
ftp.cwd(remote_dir)
print(f'✅ 已创建并切换到目录: {remote_dir}')
# 生成新的文件名
new_filename = generate_timestamp_filename()
# 上传文件
print(f'正在上传文件: {local_file_path} -> {new_filename}')
with open(local_file_path, 'rb') as file:
ftp.storbinary(f'STOR {new_filename}', file)
print(f'✅ 文件上传成功: {new_filename}')
# 关闭FTP连接
ftp.quit()
return True, new_filename
except Exception as e:
print(f'❌ FTP上传失败: {e}')
try:
ftp.quit()
except:
pass
return False, None
def test_image_url(filename, base_url="https://file.20230611.cn", remote_dir="wx_group"):
"""
测试图片URL是否可访问
Args:
filename: 文件名
base_url: 基础URL
remote_dir: 远程目录
Returns:
tuple: (是否可访问, 完整URL)
"""
try:
full_url = f"{base_url}/{remote_dir}/{filename}"
print(f'正在测试图片URL: {full_url}')
response = requests.get(full_url, timeout=10)
if response.status_code == 200:
print(f'✅ 图片URL可正常访问: {full_url}')
return True, full_url
else:
print(f'❌ 图片URL访问失败,状态码: {response.status_code}')
return False, full_url
except Exception as e:
print(f'❌ 图片URL测试失败: {e}')
return False, full_url
def connect_database():
"""
连接MySQL数据库
Returns:
pymysql.Connection: 数据库连接对象,失败返回None
"""
try:
print('正在连接MySQL数据库...')
connection = pymysql.connect(
host='134.71.130.144',
port=6806,
user='learn_gaojiufeng1',
password='c32cltPLHB45MCGkd',
database='learn_gaojiufeng1',
charset='utf8mb4'
)
print('✅ 数据库连接成功')
return connection
except Exception as e:
print(f'❌ 数据库连接失败: {e}')
return None
def check_recent_update():
"""
检查最近3天内是否有成功更新记录
Returns:
bool: True表示最近3天内有更新,False表示没有更新或检查失败
"""
connection = None
try:
connection = connect_database()
if not connection:
return False
cursor = connection.cursor()
# 检查think_wx_group表中是否有最近3天内更新的记录
# 使用update_time字段(如果存在)或者通过wx_group_image字段的修改时间来判断
sql = """
SELECT COUNT(*) as count, MAX(update_time) as last_update
FROM think_wx_group
WHERE update_time >= DATE_SUB(NOW(), INTERVAL 3 DAY)
"""
cursor.execute(sql)
result = cursor.fetchone()
if result and result[0] > 0:
print(f'✅ 检测到最近3天内有更新记录,最后更新时间: {result[1]}')
print('🔄 跳过本次执行,避免重复更新')
return True
else:
print('📅 最近3天内无更新记录,继续执行脚本')
return False
except Exception as e:
# 如果表中没有update_time字段,尝试其他方式检查
try:
cursor = connection.cursor()
# 检查wx_group_image字段是否在最近3天内有变化
# 通过检查是否有非空的wx_group_image字段来判断
sql = """
SELECT COUNT(*) as count
FROM think_wx_group
WHERE wx_group_image IS NOT NULL
AND wx_group_image != ''
"""
cursor.execute(sql)
result = cursor.fetchone()
if result and result[0] > 0:
print('⚠️ 检测到已有二维码记录,可能最近已更新过')
print('🔄 为安全起见,跳过本次执行')
return True
else:
print('📅 未检测到二维码记录,继续执行脚本')
return False
except Exception as e2:
print(f'⚠️ 检查更新记录时出错: {e2}')
print('🔄 为安全起见,跳过本次执行')
return True
finally:
if connection:
connection.close()
def update_wx_group_image(image_url):
"""
更新think_wx_group表中所有记录的wx_group_image字段
Args:
image_url: 新的图片URL
Returns:
bool: 更新是否成功
"""
connection = None
try:
connection = connect_database()
if not connection:
return False
cursor = connection.cursor()
# 更新所有记录的wx_group_image字段和update_time字段
sql = "UPDATE think_wx_group SET wx_group_image = %s, update_time = NOW()"
cursor.execute(sql, (image_url,))
# 获取受影响的行数
affected_rows = cursor.rowcount
# 提交事务
connection.commit()
print(f'✅ 数据库更新成功,共更新了 {affected_rows} 条记录')
print(f'✅ 所有微信群的二维码地址已更新为: {image_url}')
return True
except Exception as e:
print(f'❌ 数据库更新失败: {e}')
if connection:
connection.rollback()
return False
finally:
if connection:
connection.close()
print('数据库连接已关闭')
def launch_wechat(config):
"""尽可能通用地启动微信。
优先使用monkey启动,避免依赖具体Activity名称。
"""
adb = _adb_cmd(config)
timeout = config['app']['launch_timeout']
try:
print('正在通过monkey启动微信...')
subprocess.run(adb + ['shell', 'monkey', '-p', WECHAT_PACKAGE,
'-c', 'android.intent.category.LAUNCHER', '1'],
capture_output=True, text=True, check=True, timeout=timeout)
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e:
print(f"monkey启动失败,尝试am start: {e}")
try:
print('正在通过am start启动微信...')
# 常见LauncherUI路径:com.tencent.mm/.ui.LauncherUI
subprocess.run(adb + ['shell', 'am', 'start', '-n', f'{WECHAT_PACKAGE}/.ui.LauncherUI'],
capture_output=True, text=True, check=True, timeout=timeout)
return True
except (subprocess.CalledProcessError, subprocess.TimeoutExpired) as e2:
print(f"am start启动失败: {e2}")
return False
def enter_search(config):
"""尝试进入微信搜索界面:
1) 通过OCR点击“搜索”文本
2) 失败则发送KEYCODE_SEARCH作为回退
3) 再失败则尝试点击顶部区域
"""
print('尝试进入搜索界面...')
shot = ADBUtils.take_screenshot(config, 'wechat_main.png')
if shot:
ocr = OCRUtils.call_http_ocr_api(shot)
pos = _find_text_center(ocr, ['搜索'])
if pos:
print(f"找到'搜索',点击位置: {pos}")
_tap(config, *pos)
time.sleep(1.5)
return True
# 回退方案1:发送搜索键
try:
adb = _adb_cmd(config)
subprocess.run(adb + ['shell', 'input', 'keyevent', '84'],
capture_output=True, text=True, check=True, timeout=config['adb']['timeout'])
time.sleep(1.5)
return True
except Exception:
pass
# 回退方案2:点击顶部近似区域
print('使用顶部区域点击作为回退方案...')
try:
# 以1080x2400为参考,上方搜索栏大约在 y=120~200
_tap(config, 200, 160)
time.sleep(1.5)
return True
except Exception:
return False
def select_group_from_search(config):
"""在搜索界面输入'A100'后,点击“翰林刷题交流群A100”群聊。"""
print('输入关键词 A100 进行检索...')
_type_text_ascii(config, 'A100')
time.sleep(1)
_press_enter(config)
time.sleep(2)
shot = ADBUtils.take_screenshot(config, 'wechat_search_result.png')
if not shot:
return False
ocr = OCRUtils.call_http_ocr_api(shot)
pos = _find_text_center(ocr, ['翰林刷题交流群A100'])
if not pos:
# 兜底:先点击包含“A100”的条目
pos = _find_text_center(ocr, ['A100'])
if pos:
print(f"点击群聊条目位置: {pos}")
_tap(config, *pos)
time.sleep(2.5)
return True
print('未在搜索结果中找到目标群聊')
return False
def open_group_qrcode(config):
"""在群聊页面打开群资料,点击"群二维码"。"""
shot = ADBUtils.take_screenshot(config, 'wechat_group_chat.png')
if not shot:
return False
# 方案1:尝试通过UI结构找到右上角按钮
success = _try_click_top_right_button(config)
if not success:
# 方案2:OCR查找"..."或"更多"
ocr = OCRUtils.call_http_ocr_api(shot)
more_pos = _find_text_center(ocr, ['...', '更多', '︙'])
if more_pos:
print(f"通过OCR点击右上角更多: {more_pos}")
_tap(config, *more_pos)
time.sleep(2)
else:
# 方案3:根据屏幕分辨率估算右上角位置
screen_width, screen_height = _get_screen_size(config)
# 右上角大概位置:宽度的90%,高度的8%
estimated_x = int(screen_width * 0.9)
estimated_y = int(screen_height * 0.08)
print(f"尝试点击估算的右上角位置: ({estimated_x}, {estimated_y})")
_tap(config, estimated_x, estimated_y)
time.sleep(2)
# 在群资料页面查找“群二维码”并点击
shot2 = ADBUtils.take_screenshot(config, 'wechat_group_info.png')
if not shot2:
return False
ocr2 = OCRUtils.call_http_ocr_api(shot2)
qrcode_pos = _find_text_center(ocr2, ['群二维码', '二维码'])
if qrcode_pos:
print(f"点击群二维码: {qrcode_pos}")
_tap(config, *qrcode_pos)
time.sleep(2)
return True
print('群资料页面未找到“群二维码”')
return False
def screenshot_qrcode(config):
path = ADBUtils.take_screenshot(config, 'wechat_group_qrcode.png')
if path:
print(f"✅ 群二维码已截图保存: {path}")
return path
return None
def main():
config = load_config()
# 00. 检查最近3天内是否已有更新记录
print('🔍 检查最近3天内的更新记录...')
if check_recent_update():
print('✅ 检测到最近3天内已有更新,脚本退出')
return
# 01. 检查ADB与设备状态
if not ADBUtils.check_adb_available(config):
raise Exception('ADB未安装或配置错误')
if not ADBUtils.check_device_connected(config):
raise Exception('设备未连接')
if not ADBUtils.unlock_device(config):
raise Exception('设备解锁失败')
# 02. 检查微信是否已运行,如果运行则杀死
if check_wechat_running(config):
print('检测到微信正在运行,正在杀死进程...')
if not kill_wechat(config):
print('⚠️ 无法完全杀死微信进程,但继续尝试启动')
else:
print('微信未运行,可以直接启动')
# 03. 启动微信
if not launch_wechat(config):
raise Exception('微信启动失败')
print('等待微信加载...')
time.sleep(5)
# 03. 进入搜索
if not enter_search(config):
raise Exception('进入搜索界面失败')
# 04. 输入关键词并选择群聊
if not select_group_from_search(config):
raise Exception('未找到目标群聊')
# 05. 打开群二维码
if not open_group_qrcode(config):
raise Exception('未能打开群二维码页面')
# 06. 截图保存
screenshot_path = screenshot_qrcode(config)
if not screenshot_path:
raise Exception('二维码截图失败')
# 07. 上传到FTP服务器
ftp_host = "192.168.0.18"
ftp_user = "R7ttCsQPssip1"
ftp_password = "WMtdkXPWBp6JR"
print('正在上传截图到FTP服务器...')
upload_success, uploaded_filename = upload_to_ftp(screenshot_path, ftp_host, ftp_user, ftp_password)
if upload_success and uploaded_filename:
print('✅ 截图已成功上传到FTP服务器')
# 08. 测试图片URL是否可访问
url_accessible, image_url = test_image_url(uploaded_filename)
if url_accessible:
# 09. 更新数据库中的微信群二维码地址
print('正在更新数据库中的微信群二维码地址...')
if update_wx_group_image(image_url):
print('🎉 完整流程执行成功!')
print(f'📸 截图已保存: {screenshot_path}')
print(f'🌐 图片URL: {image_url}')
print('💾 数据库已更新')
else:
print('⚠️ 数据库更新失败,但图片已成功上传')
else:
print('⚠️ 图片URL无法访问,跳过数据库更新')
else:
print('⚠️ FTP上传失败,但截图已保存在本地')
if __name__ == '__main__':
main()【一】.钩子文件的设置和创建(1).打开hooks目录,可以看到有一个post-commit.tmpl文件,这是一个模板文件。复制一份,重命名为post-commit,将其用户组设为www,并设置为可执行。chown www:www post-commitchmod +x post-commit(2...
1.全局用户信息设置 git config --global user.name gaojiufeng git config --global user.email 392223903...
在安装之前我们先看看官方给出的依赖关系.首先是dll文件和mongodb软件的依赖关系然后是PHP文件和dll的依赖关系我的是phpstudy的集成环境PHP5.4.45 NTS+Apache+Mysql【一】.安装mongodb3.0软件对比依赖关系下载mongodb3.0.msi软件,完整名称:...
1.远程仓库的协作模式开发者把自己最新的版本推到线上仓库,同时把线上仓库的最新代码,拉到自己本地即可2.注册git帐号国外: http://www.github.com国内: http://git.oschina.net2.在码云创建项目,不要初始化readmegit push https://gi...
private const string fantizi = "高久峰是個程序員"; private const string jiantizi = "高久峰是个程序员...
1.文件redis-2.6.14.tar.gz的上传 /home/john/创建rdtar文件夹 上传redis-2.6.14.tar.gz至rdtar文件夹 2.解压文件 cd /home/john/rdtar tar &n...