日记数据深度分析报告:2025年12月30日

日记条目概览

日期: 2025年12月30日 (星期二,单周)

心情: 🙂 (积极)

天气: 城市: 河南正阳县, 天气: 霾, 温度: 5℃, 风向: 北, 风力: ≤3级, 湿度: 74% (报告时间: 10:00:12)

当日活动内容:

日记内容分析

今日的日记记录了一天充满技术探索与日常自动化维护的日程。整体心情显示为“积极”,尽管在某些方面遇到了挑战,但这并未影响日记主人对工作的热情和投入。

今日天气“霾”和“5℃”略显阴冷,但并未影响到日记主人的积极情绪和高效工作状态。这体现了其良好的心态和对技术的热爱。

Mobile仓库代码分析

提供的Python代码是一个名为 `UiAutomatr` 的类,它是一个功能丰富的移动设备自动化框架,主要用于Android设备。结合了 `uiautomator2` 和 `adb` 进行设备操作,并利用 `cv2`、`pytesseract`、`ddddocr` 进行图像识别和文字识别。此外,它还集成了数据存储 (`MySQLManager`, `RedisManager`)、错误报告 (`allure`, `logging`, `qq_email`, `dingmessage`) 和硬件控制 (`switch_2400`, `set_fan_speed`) 等模块,展现了其高度的复杂性和实用性。

核心功能与亮点:

自动化任务报告分析 (2025-12-30)

这份报告提供了多台手机在不同自动化任务中的表现数据,揭示了系统运行的现状、潜在问题和优化方向。报告生成于2025年12月30日22:41:00,耗时57.88秒,今日日志总数7,032条。

关键发现:

趋势与异常:

针对Mobile仓库的创新想法与建议

基于今日的日记记录、代码功能和自动化报告,以下是一些创新想法和具体建议,以提升Mobile自动化系统的效率、健壮性和智能性:

1. 提升ADB连接与设备稳定性(最优先):

2. 优化任务逻辑与收益效率:

3. 提升脚本健壮性与可维护性:

创意工具提案: "数字牧场AI助手"

设想一个基于Web的“数字牧场AI助手”平台,它不仅能监控所有移动设备的自动化任务,还能通过更智能的方式进行交互和故障排除。

功能特色:

技术栈构想:

通过“数字牧场AI助手”,用户将从被动处理故障转变为主动预防和智能管理,大幅提升自动化系统的整体价值和用户体验。


""" % full_code_string with allure.step(name): # Optional: 将层次结构信息附加到步骤 allure.attach(html_content, name=name, attachment_type=allure.attachment_type.HTML) if not path: path = self.screen('phonetimeout/' + self.model + 'ui.png') allure.attach.file(path,name=name, attachment_type=allure.attachment_type.PNG) if '失败' in mode: assert False, mode elif '跳过' in mode: pytest.skip(mode) elif '损坏' in mode: raise Exception(mode) except Exception as e: logging.error(e) def get_attribute_value(self, obj, attribute): try: value = ast.literal_eval(f'obj.{attribute}') return value except (SyntaxError, ValueError): return None def open_app1(self): self.multiple_find_picture([], 10, "点击", "不返回", delem=['d(resourceId="android.miui:id/app1")'], xy_percentage=True) def browse_task(self, key_list=['d(textContains="浏览完成")', 'd(description="任务完成")','d(text="任务完成")'], times=15, img: list = (), gep=0): for i in range(times): if i < 5: [x, y], k = self.multiple_find_picture([], 1, "点击", "不返回", delem=['d(text="打开")']) if gep: if i % gep != 0: # 多少次找一次元素 self.d.swipe_ext("up", random.uniform(0.4, 1)) continue time.sleep(random.uniform(2, 3)) # 浏览完成全部完成 [x, y], k = self.multiple_find_picture(img, 2, "不点击", "下滑",delem=key_list, is_print=False) if x > 0 and y > 0: break def back_task(self, key_dic={'d(text="每日任务")':"break", 'd(text="饿了么果园")': '点击', 'd(text="领水滴")':"点击"}): # d(text="每日任务") keys_list = list(key_dic.keys()) for i in range(5): [x, y], k = self.multiple_find_picture([], 2, "不点击", "不返回",delem=keys_list) if x > 0 and y > 0: value = key_dic.get(keys_list[k]) if value == '点击': self.click(x,y) time.sleep(5) elif value == 'break': return True elif value == '下滑': # 无需返回 # print("下滑") self.d.swipe_points([[0.8, 0.6], [0.8, 0.1]], 0.1) time.sleep(1) continue else: self.back() time.sleep(2) def calculate_sliding_distance(self, elem, y1=0.6, y2=0.55): # 计算距离确认是否有弹窗 if elem: center = elem.center() y1_loc = center[1] self.d.swipe_points([[0.8, y1], [0.8, y2]], 0.1) time.sleep(1) center = elem.center() y2_loc = center[1] distance = y1_loc - y2_loc # print(distance) if distance == 0: return False else: return distance else: return False def get_mobile_ipv6(self, ipv6_prefix="2409"): # 获取ipv6地址,已知小米13可以,其他手机没ipv6 # 方法2:adb shell "curl -s https://ipv6.ddnspod.com", 这个测试可以使用的ipv6 text = self.console_input('shell ip -6 addr show', ret=1) r_list = re.compile("(%s:\\S+)/64" % ipv6_prefix, re.S).findall(text) if r_list: print(r_list[0]) return r_list[0] def skip(self, z=3): if self.z >= z: pytest.skip("超过最大失败次数") ret = self.console_input('shell pm list packages', ret=1) if self.app_name not in ret: pytest.skip("没有安装app" + self.app_name) if not self.tao_user and not self.user_name: pytest.skip("没有安装获取到用户名" + self.model) def open_app(self, access_elem=[], name_elem=None,ad_elem=[]): for i in range(5): self.d.app_stop(self.app_name) time.sleep(1) print(self.app_name, self.activity) if self.activity: self.d.app_start(self.app_name, activity=self.activity) else: self.d.app_start(self.app_name) if not self.tao_user: print(" %s没有用户名需要获取用户名" % self.model) # d(resourceId="com.wuba:id/btn_ads") if ad_elem: self.multiple_find_picture([], 10, "点击", "不返回", [], delem=ad_elem, xy_percentage=True) time.sleep(2) # d.xpath('//*[@resource-id="com.jifen.qukan:id/app"]/android.widget.RelativeLayout[1]/android.widget.ImageView[1]') # 点击叉 [x, y], k = self.multiple_find_picture([], 20, "点击", "返回", [], delem=access_elem, xy_percentage=True) if x > 0 and y > 0: print("找到我的") for i in range(4): try: # d(resourceId="com.xunmeng.pinduoduo:id/pdd", text="yys534640040.club") if type(name_elem) == list: index = i % len(name_elem) # print('index=', index) elem = name_elem[index] else: elem = name_elem if elem.exists: text = elem.info['text'] if not text: # contentDescription text = elem.info['contentDescription'] self.tao_user = text.strip() print(self.tao_user) os.environ[self.ip] = str(self.tao_user) time.sleep(1) # self.multiple_find_picture([], 10, "点击", "不返回", [], delem=['d(text="取消")'], xy_percentage=True) # time.sleep(1) break time.sleep(1) except Exception as e: print(e) else: break if not self.tao_user: self.tb_timeout_option("找用户名失败") return False else: break def do_task_for_list(self, key_list=['去完成', '去领取'], title1_elem='sibling(index=0)', title2_elem = 'sibling(index=1)', app_name = 'com.eg.android.AlipayGphone', browse_key = {'关键字':['浏览', '指定动作奖励', '逛'], '成功元素':['d(textContains="浏览完成")', 'd(description="任务完成")','d(text="任务完成")'], '成功图片':[], '浏览次数':10}, back_key_dict = {'d(text="每日任务")':"break", 'd(text="饿了么果园")': '点击', 'd(text="领水滴")':"点击"}, get_rewards=['d(text="领取")', 'd(text="领奖")'], exclude_list=['多少运动币','外卖实付', '3个店铺'], watch_vd={}, other_task_dic={}, do_times=1, y_per_max=0.94, y_per_min=0.3, task_name='', delay = 6 ): # get_rewards: 任务完成后领取奖励按钮 # back_key_dict: 返回按钮 # watch_vd看视频的,如{'关键字':["看广告视频", '每日可完成7次'], "方法":self.watch_tv} # other_task_dic比如other_task_dic的扩展,多个字典{'惊喜水滴':self.other_task_dic} text2 = '' do_task = 0 # 没有一个满足key_list 则跳出循环 return_dic = {} # 统计没有完成的key_list max_ins = 5 for ins in range(15): # 初始索引到0-3,如果遇到不做的任务获准,做过的还有+1 for key_word in key_list: time.sleep(0.5) if ins <= 0: # 返回到做任务位置 is_true = self.back_task(key_dic=back_key_dict) if not is_true: return ins %= max_ins print('key_word=%s, ins=%s' % (key_word, ins)) try: # 计算return_dic all = len(self.d.xpath('//*[@text="%s"]' % key_word).all()) if all: return_dic[key_word] = all else: if key_word in return_dic: del return_dic[key_word] pass except Exception as e: pass # 查看是不是本应用 get_app_name = self.get_app_name() if get_app_name != app_name: print("不是本应用%s" % app_name) self.d.app_start(app_name) time.sleep(2) try: if '[%s]' in title1_elem: # print('2') click_elem = self.XPathElement(self.d, title1_elem % (key_word, ins + 1)) else: click_elem = self.d(text=key_word, instance=ins) # print(2, click_elem) if click_elem: is_numeric = lambda s: s.replace('.', '', 1).isdigit() if s.count('.') <= 1 else False if is_numeric(str(title1_elem).replace('-', '')) and is_numeric(str(title2_elem).replace('-', '')): # print('3') # 根据索引判断,is_numeric检查是否数字 ret = self.get_dump(re_text=key_word, select=ins) index = ret.get('index') + title1_elem text1 = self.d(index=index).info['text'] # title2 = eval(f"click_elem.{title2_elem}") index = ret.get('index') + title2_elem text2 = self.d(index=index).info['text'] # print(text1, text2) elif '//*' in title1_elem or '//com' in title1_elem: # xpath定位的情况 # print(self.d.xpath(title1_elem % key_word).all()[ins].info) if '[%s]' in title1_elem: # 带索引的xpath # self.XPathElement(self.d, title1 = self.XPathElement(self.d, title1_elem % (key_word, ins + 1)) text1 = title1.info['text'] if title1 else '' if title1_elem == title2_elem: # 相等就不需要获取标题2了 text2 = '' else: title2 = self.XPathElement(self.d, title2_elem % (key_word, ins + 1)) text2 = title2.info['text'] if title2 else '' else: title1 = self.d.xpath(title1_elem % key_word).all()[ins * 2] text1 = title1.info['text'] if title1 else '' if title1_elem == title2_elem: # 相等就不需要获取标题2了 text2 = '' else: title2 = self.d.xpath(title2_elem % key_word).all()[ins * 3] text2 = title2.info['text'] if title2 else '' else: # print("2323") # 使用getattr解析方法调用表达式字符串,获取兄弟元素 title1 = eval(f"click_elem.{title1_elem}") text1 = title1.info['text'] if title1 else '' title2 = eval(f"click_elem.{title2_elem}") text2 = title2.info['text'] if title2 else '' # print(text1, text2) width, height = self.get_phone_width_height() # print(6) # print(click_elem.info) y_per = click_elem.center()[1] / height if y_per < y_per_min: print(y_per, "下滑") self.d.swipe_ext("down", 0.5) time.sleep(1) continue elif y_per > y_per_max: print(y_per, "上滑") self.d.swipe_ext("up", 0.5) continue else: print("无需滑动") print(y_per) text = text1 + text2 if any(i in text for i in exclude_list): print("排除任务【%s】" % text) continue user_search = self.tao_user # text_search = text.replace("(", "").replace(")", "").replace("+", "").replace(' ', '').replace('*', '') # print(text_search) text_search = "%s已经完成%s任务" % (user_search + self.project, text) value = self.redis_manager.redis_search(text_search) print('value:', value,text) # print() # value = self.MySQLManager.sql_search("content", "%s已经完成%s任务" % (user_search + self.project, text_search)) if value and int(value.decode('utf-8')) >= do_times: print("已经做过【%s】无需再做" % text) continue do_task = 1 time.sleep(3) click_elem.click() # 点击词 time.sleep(1) self.redis_manager.redis_set(text_search,times='today', is_countable=True) # self.MySQLManager.write_file("%s已经完成%s任务" % (self.tao_user + self.project, text_search)) # 其他任务 # other_task_dic flag_other = 0 if other_task_dic: for action, params in other_task_dic.items(): if action in text: # print( action, params) try: if len(params) == 1: # 没有参数,直接调用函数 params[0]() else: # 有参数,解包元组后调用 params[0](*params[1:]) except Exception as e: params() flag_other = 1 # for i in other_task_dic: # if i in text: # dic = other_task_dic.get(i) # dic() if not flag_other: # 包含这些关键词的不需要再做了 # 浏览任务 if browse_key: key = browse_key.get('关键字') if key: succeed_key = browse_key.get('成功元素') succeed_pic = browse_key.get('成功图片') browse_times = browse_key.get('浏览次数') if key == '无' or any(i in text for i in key): self.browse_task(key_list=succeed_key, img=succeed_pic, times=browse_times) # 视频,广告 if watch_vd: key = watch_vd.get('关键字') if any(i in text for i in key): mean = watch_vd.get('方法') mean() # 直播 time.sleep(delay) # 返回 self.back_task(key_dic=back_key_dict) # 领取奖励 # pass if get_rewards: num = 4 flge = 0 for i in range(num): delem_list = [] pic_list = [] for type_elem in get_rewards: if 'd' in type_elem: delem_list.append(type_elem) else: pic_list.append(type_elem) [x, y], k = self.multiple_find_picture(pic_list, 3, "不点击", "不返回", delem=delem_list,xy_percentage=True) if x > 0 and y > 0: if y > 0.9: print(y, "上滑") if flge == 0: num += 5 flge = 1 self.d.swipe_ext("up", 0.5) else: self.click(x, y) time.sleep(2) pass else: break if flge == 1: for i in range(3): print("下滑") self.d.swipe_ext("down", 0.5) except Exception as e: print(e) if do_task == 0: self.d.swipe_ext("up", 0.5) if do_task == 0 and ins >= max_ins - 1: break if return_dic: try: # back_key_dict result_string = "" for key, value in return_dic.items(): result_string += f"{key}: {value}, " result_string = result_string.rstrip(', ') if back_key_dict: result_string += "_返回关键字:" +str(list(back_key_dict.keys())[0]).replace('"', '').replace("'", '') print("['%s','剩余任务次数','%s']" % (self.model + self.tao_user + self.project + task_name, result_string)) self.MySQLManager.write_file("['%s','剩余任务次数','%s']" % (self.model + self.tao_user + self.project + task_name, result_string)) except Exception as e: logging.error(e) def click_vd_cross(self, className="android.widget.ImageView", min_x=0.8, max_y = 0.1): for i in range(10): elem = self.d(resourceId="android:id/content").child(className=className, instance=i) if elem: center = elem.center() width, height = self.get_phone_width_height() y_per = center[1] / height x_per = center[0] / width # print(x_per, y_per) if x_per > min_x and y_per < max_y: # 叉子在左上角 print('点击叉', x_per, y_per) self.click(x_per, y_per) break def concatenate_images(self, image_paths, output_path): from PIL import Image # 打开所有图片并获取它们的尺寸 images = [Image.open(path) for path in image_paths] widths, heights = zip(*(img.size for img in images)) # 计算拼接后的总宽度和最大高度 total_width = sum(widths) max_height = max(heights) # 创建一个新图像,尺寸为拼接后图像的尺寸 concatenated_image = Image.new('RGB', (total_width, max_height)) # 将每张图片粘贴到新图像上 x_offset = 0 for img in images: concatenated_image.paste(img, (x_offset, 0)) x_offset += img.width # 保存拼接后的图片 concatenated_image.save(output_path) return output_path def click_ok_button(self): # git branch -d temp # '//*[@text="确定"]' # d.xpath('//*[@resource-id="android:id/button1"]') self.allure_screen("确定前") [x, y], k = self.multiple_find_picture([], 5, "点击", "不返回", xpath=['//*[@text="确定"]', '//*[@resource-id="android:id/button1"]'], xy_percentage=True) if x > 0 and y > 0: print("找到确定") self.allure_screen("确定后") def clear_app(self, app_name="com.jifen.qukan"): ret = self.console_input('shell pm clear %s' % app_name, ret=1) print('清除app%s数据' % app_name, ret) def yzm(self, times=8, swip=False): # d(text="请按照说明拖动滑块") mx = 0 i = 0 t1 = time.time() distance = 0 all_width = 1 for i in range(times + random.randint(0, 5)): try: [x, y], k = self.multiple_find_picture([], 10, "不点击", "不返回", delem=['d(resourceId="scratch-captcha-btn")', 'd(text="请按照说明拖动滑块")', 'd(text="验证码拦截")', 'd(text="访问被拒绝")'], xy_percentage=True) if x > 0 and y > 0: if k == 3: # 淘宝解决方案0 self.multiple_find_picture(['淘宝解决方案'], 2, "点击", "不返回") if swip: # get_canny # '淘宝验证码滑块按钮' [x, y], k = self.multiple_find_picture(['淘宝操作太频繁了','淘宝验证码滑块按钮'], 5, "不点击", "不返回", delem=['d(resourceId="scratch-captcha-btn")']) if x > 0 and y > 0: if k == 0: return False all_width, all_height = self.get_phone_width_height() width = all_width * 0.63 self.d.touch.down(x, y) time.sleep(0.3) self.d.touch.move(x+ width, y) time.sleep(0.5) # self.swipe(x, y, x + width, y) print("向右滑动总距离(方便截全图):", x + width) # 0.03 ,0.24 # # (y1(上):y2(下), x1(左):x2(右)) 0.37124999999999997 0.5812499999999999 0.1550925925925926 0.892 # print(y / all_height - 0.24, y / all_height - 0.01, x / all_width, 0.892, y / all_height) pic_path = self.screen(pic_name = self.ip + 'yzm.png', region=(y / all_height - 0.24, y / all_height - 0.02, x / all_width, 0.892)) x_list, path = self.find_and_show_rightmost_points(pic_path) # print(x_list) mx = random.choice(x_list) # print(i, ",回退,", x + mx, '占比', (mx / width) / width, ', mx=', mx) # 可以+一定距离(如x + mx + 10),我去掉了,看着能用 distance = x + mx print(i, '.图片选择的相对距离mx(不算按钮距离)',mx, ',绝对距离按钮',distance, ",swipe距离", x + width, y, distance , y, ', swipe_points距离比例:', [[(x + width)/all_width, y / all_height], [distance / all_width, y / all_height]]) # self.swipe(x + width, y, distance , y) # time.sleep(4) self.d.swipe_points([[(x + width)/all_width, y / all_height], [distance / all_width, y / all_height]], random.uniform(0.1, 0.8)) self.d.touch.up(width - mx, y) # self.d.swipe_points([[x, y], [x + mx, y]], random.uniform(0.1, 1)) time.sleep(1) pic_path = self.screen(pic_name = 'phonetimeout/' +self.ip + 'yzm.png') ret_path = self.concatenate_images([pic_path, path], self.screen_path + 'phonetimeout/' + self.ip + '合并.png') self.ui_layout(name='canny' + str(i) + "_SelectX_" + str(mx) + '_cost_%.2f' % (time.time() - t1), path=ret_path) num = random.randint(0, 2) # print(num) if num == 1: # 验证码点我反馈0 [x, y], k = self.multiple_find_picture(['验证码点我反馈'], 3, "点击", "不返回", xy_percentage=True) if x > 0 and y > 0: [x, y], k = self.multiple_find_picture([], 10, "点击", "不返回", delem=[random.choice(['d(text="滑动验证码后提示验证失败")', 'd(text="频繁看到该验证码")'])], xy_percentage=True) if x > 0 and y > 0: [x, y], k = self.multiple_find_picture([], 10, "点击", "不返回", delem=['d(text="提交")'], xy_percentage=True) self.back() time.sleep(5) else: try: ret = [distance / all_width, distance,i, time.time() - t1] if ret[0]: result = '距离比例%s, 距离%s,滑动次数%s,耗时%.2f秒' % (ret[0], ret[1], ret[2], ret[3]) print("['%s','淘宝滑动验证码结果','%s']" % (self.model + self.tao_user + self.project, result)) self.MySQLManager.write_file("['%s','淘宝滑动验证码结果','%s']" % (self.model + self.tao_user + self.project, result)) self.ui_layout(name=result) except Exception as e: print(e) return ret except Exception as e: print(e) def find_and_show_rightmost_points(self,image_path='/Users/yangyongsheng/nas/download/23.png', num_regions=5): # image_path = '/Users/yangyongsheng/nas/download/23.png' # 读取图像 img = cv.imread(image_path) if img is None: raise ValueError("无法读取图像,请检查图像路径。") # 转换为灰度图像 gray = cv.cvtColor(img, cv.COLOR_BGR2GRAY) # 应用高斯模糊 blurred = cv.GaussianBlur(gray, (5, 5), 0) # 应用Canny边缘检测 edges = cv.Canny(blurred, 50, 150) # 找到连通区域 num_labels, labels, stats, centroids = cv.connectedComponentsWithStats(edges, connectivity=8) # 排除背景,按面积排序,选出前num_regions个最大连通区域 largest_regions = np.argsort(stats[1:, cv.CC_STAT_AREA])[::-1][:num_regions] + 1 rightmost_points = [] # 计算每个连通区域的最右边点 for region in largest_regions: mask = (labels == region).astype(np.uint8) contours, _ = cv.findContours(mask, cv.RETR_EXTERNAL, cv.CHAIN_APPROX_SIMPLE) if contours: rightmost_point = max(contours[0], key=lambda point: point[0][0]) rightmost_points.append(tuple(rightmost_point[0])) # 打印并标注最右边点 x_lis = [] for i, point in enumerate(rightmost_points): print(f"连通区域 {i+1} 的最右边点坐标: {point}") # 在图像上标注最右边点 cv.circle(img, point, 5, (0, 255, 0), -1) cv.putText(img, f"{point}", (point[0] + 10, point[1]), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 1) x_lis.append(point[0]) # 展示图像 # plt.figure(figsize=(10, 10)) # plt.imshow(cv.cvtColor(img, cv.COLOR_BGR2RGB)) # plt.title("Rightmost Points") # plt.show() save_path = image_path.replace('.png', '_marked.png') cv.imwrite(save_path, img) print(f"标记后的图像已保存到 {save_path}") # print(x_lis) try: move = max(x_lis) x_lis.remove(move) except Exception as e: print(e) print(x_lis) return x_lis, save_path def get_appsize(self): # 小米应用商店 [x, y], k = self.multiple_find_picture([], 10, "点击", "返回", delem=['d(text="我的")']) if x > 0 and y > 0: [x, y], k = self.multiple_find_picture([], 6, "点击", "不返回", [], delem=['d(text="应用卸载")']) if x > 0 and y > 0: # elem = self.d(text="应用卸载") self.d(text="应用卸载").exists(timeout=3) # elem_zise = self.d(text="应用大小") # if not elem_zise: time.sleep(2) # 点击一下有用 try: self.d(resourceId="android:id/icon1") except Exception as e: self.d(resourceId="com.xiaomi.market:id/sort_mode_chooser").click() time.sleep(2) try: self.d(resourceId="android:id/text1", text="应用大小") except Exception as e: self.d(text="应用大小").click() d = self.d time.sleep(4) # 获取 hierarchy print("获取 hierarchy") xml_string = d.dump_hierarchy() # print() # 解析 XML root = etree.fromstring(xml_string.encode('utf-8')) # 示例 XPath 查询 # 1. 根据 class 定位所有 TextView 元素 # d.xpath('//*[contains(@text="大小: ")]') text = '' text_views = root.xpath('//node[contains(@text, "大小: ")]') for text_view in text_views: sibings = text_view.getparent().getchildren() size = text_view.get('text') # print(f"TextView Text: {size}") preceding_sibling = text_view.xpath('./preceding-sibling::node[1]') if preceding_sibling: app_name = preceding_sibling[0].get('text') # print(f"{app_name}{size}") text += f"{app_name} {size}
" # print(text) if text: print("['%s','应用大小排行','%s']" % (self.model + self.ip, text)) self.MySQLManager.write_file("['%s','应用大小排行','%s']" % (self.model + self.ip, text)) def stop_background_apps(self, ip, port): from concurrent.futures import ThreadPoolExecutor # 构建adb连接命令 adb_connect_command = f"{self.adb_path} connect {ip}:{port}" subprocess.run(adb_connect_command, shell=True) # 获取后台应用列表 get_background_apps_command = self.adb_path + " %s:%s shell dumpsys activity activities" % (ip, port.replace(":", '')) print(get_background_apps_command) # result = subprocess.run(get_background_apps_command, shell=True, capture_output=True, text=True) result = os.popen(get_background_apps_command).read() print(result) # 使用正则表达式获取包名 pattern = re.compile(r'ActivityRecord{.*? u0 (.*?)/') matches = pattern.findall(result.stdout) print("后台应用列表:", matches) # 停止所有后台应用 def stop_single_app(package_name): if "com.miui.home" not in package_name and "com.google.android.systemui" not in package_name: stop_app_command = f"{self.adb_path} {ip}:{port} shell am force-stop {package_name}" subprocess.run(stop_app_command, shell=True) print(f"应用 {package_name} 已停止") # 使用多线程并行停止应用 with ThreadPoolExecutor() as executor: executor.map(stop_single_app, matches) def open_accessibilit(self, ip=''): if ip: self.ip = ip self.port = "5555" self.read() # adb -s 192.168.31.182 shell am start -a android.settings.ACCESSIBILITY_SETTINGS self.console_input("shell am start -a android.settings.ACCESSIBILITY_SETTINGS") [x, y], k = self.multiple_find_picture([], 20, "点击", "不返回", delem=['d(text="已下载的应用")', 'd(text="已下载的服务")', 'd(text="更多已下载的服务")']) if x > 0 and y > 0: time.sleep(0.5) Accessibility_name = '我的Accessibility' text_list = [] elem2 = self.d(text="我的Accessibility").sibling(resourceId="android:id/summary") if elem2.exists: text = elem2.info["text"] # print(text) text_list.append(text) elem = self.d.xpath('//*[@text="%s"]/parent::*/following-sibling::*//android.widget.TextView' % Accessibility_name) if elem.exists: text = elem.info["text"] text_list.append(text) # print(text) self.allure_screen("是否关闭Accessibility") print(text_list) if '关闭' in text_list or "已关闭" in text_list: print("打开Accessibility") [x, y], k = self.multiple_find_picture([], 20, "点击", "不返回", delem=['d(text="%s")' % Accessibility_name]) if x > 0 and y > 0: time.sleep(1) [x, y], k = self.multiple_find_picture([], 20, "点击", "不返回", delem=['d(resourceId="android:id/switch_widget")', 'd(resourceId="android:id/widget_frame")']) if x > 0 and y > 0: for i in range(12): [x, y], k = self.multiple_find_picture([], 10, "不点击", "不返回", delem=['d(text="确定")']) if x > 0 and y > 0: # d(resourceId="com.miui.securitycenter:id/check_box") self.d(resourceId="com.miui.securitycenter:id/check_box").click() self.d(text="确定").click() time.sleep(1) self.d.press("home") break else: time.sleep(1) def inspect_power(self, ip, port, threshold=40): try: if ip: self.ip = ip if port: self.port = port self.read() output = self.console_input('shell dumpsys battery', ret=1) match = re.search(r'level: (\d+)', output) power = match.group(1) int_power = int(power) print(self.ip, '电量', int_power) if int_power <= threshold and self.ip != self.main_mobile_ip: if self.d: self.d.screen_off() # 关闭屏幕 self.create_cron_switch(40, mip=self.ip) print("%s电量过低,等待40分钟充电" % self.ip) # time.sleep(1800) else: print("不满足充电") except Exception as e: print(e) def close_mobile(self, ip='', port=''): if ip: self.ip = ip if port: self.port = ':' + port self.read() # print(1) # 1.关闭所有app # 使用adb命令关闭所有后台进程 try: self.stop_background_apps(ip, port) except Exception as e: print(e) # print(2) # 2.回到桌面 self.d.press("home") # 3.获取手机电量 self.inspect_power(ip, port) time.sleep(1) # 4.关闭屏幕 self.d.screen_off() def open_location(self, app_name): # 打开定位 self.console_input("shell pm grant %s android.permission.ACCESS_FINE_LOCATION" % app_name) def china_yzm(self): for i in range(50): path = self.screen("yzm/%s.jpg" % (self.ip + 'qm4' + str(i)), elem=self.d(resourceId="captcha").child(className="android.view.View").child(className="android.view.View")) print(path) ret = pytesseract_pic_to_text(path, 'chinese') print(ret) self.d(text="换一换").click() time.sleep(2) # path = '

' % path # path = self.screen("phonetimeout/%s.jpg" % (self.ip + 'qm')) # path = '

' % path # 请依次点击: "虞" "庖" "快" "联" # d(text="换一换") # qq_email("", "%s%s七猫%s满足10元提现,请手动提现,有验证码" % (self.model, self.u2c.tao_user, flag), path) pass def get_allphone_temperature_max(self): win_mobile_ips = self.read_yaml['win_mobile_ip'] temperature_list = [] for dic in win_mobile_ips: mip = list(dic.values())[0] mport = ':5555' device_addr = f"{mip}{mport}" if mip == self.main_mobile_ip: print("不算主力手机") continue try: # 1. 使用 'grep' 替代 'findstr' # 2. 将连接和执行命令分开,方便调试 connect_cmd = f"{self.adb_path} connect {device_addr}" subprocess.run(connect_cmd, shell=True, check=True, capture_output=True, timeout=5) temp_cmd = f"{self.adb_path} -s {device_addr} shell dumpsys battery" process = subprocess.Popen(temp_cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) output_bytes, error_bytes = process.communicate(timeout=10) # 2. 将命令输出的字节串(bytes)解码(decode)为字符串(string) output_str = output_bytes.decode('utf-8') error_str = error_bytes.decode('utf-8') if error_str: print(f"命令执行出错 for {mip}: {error_str}") continue # 3. 在搜索前先检查是否匹配成功,避免AttributeError match = re.search(r'temperature:\s*(\d+)', output_str) if match: temperature_raw = match.group(1) temperature = int(temperature_raw) / 10 temperature_list.append(temperature) print(f"成功获取 {mip} 的温度: {temperature}°C") else: print(f"未能从 {mip} 的输出中找到温度信息") except subprocess.TimeoutExpired: print(f"连接或执行命令超时 for {mip}") except subprocess.CalledProcessError as e: print(f"ADB连接失败 for {mip}: {e.stderr.decode('utf-8')}") except Exception as e: print(f"发生未知错误 for {mip}: {e}") print("\n所有成功获取的手机温度:", temperature_list) return temperature_list def set_fan_speed(self): temp_list = self.get_allphone_temperature_max() # ESP32_fan ESP32_fan = self.read_yaml.get('switch_ip', {}).get('ESP32_fan', '') # http://ESP32_fan/setSpeed?value=1 max_temp = max(temp_list) if temp_list else 0 speed = 0 if max_temp >= 50: speed = 100 elif max_temp >= 45: speed = 60 elif max_temp >= 40: speed = 50 elif max_temp >= 35: speed = 40 elif max_temp >= 30: speed = 30 else: speed = 0 url = f'http://{ESP32_fan}/setSpeed?value={speed}' response = requests.get(url) print(f"设置风扇速度为 {speed}, 温度 {max_temp}, 结果: {response.text}") # print(f"设置风扇速度为 {speed}, 温度 {max_temp}") def check_pause_status(self): """ 带有频率限制的暂停检查 """ # 1. 检查是否在冷却时间内 current_time = time.time() if current_time - self.last_pause_check_time < self.pause_check_interval: return # 时间没到,直接跳过,不查 Redis,零消耗 # 2. 更新最后检查时间 self.last_pause_check_time = current_time # 3. 真正去查 Redis pause_key = f"STOP_DEVICE_{self.ip}" try: # 你的 redis_manager.redis_search 返回的是 value 或 None # 注意:你的类可能返回 bytes,需要判断 is_paused = self.redis_manager.redis_search(pause_key) if is_paused: print(f"🔴 [Redis信号] 检测到暂停指令!设备 {self.ip} 正在暂停...") self.MySQLManager.write_file(f"设备 {self.ip} 接收暂停指令,进入等待...") # 进入死循环等待 while True: time.sleep(5) # 暂停期间每5秒查一次,不消耗资源 # 暂停期间必须强制查 Redis,忽略冷却时间 check_again = self.redis_manager.redis_search(pause_key) if not check_again: print(f"🟢 [Redis信号] 暂停解除!设备 {self.ip} 恢复运行。") self.MySQLManager.write_file(f"设备 {self.ip} 恢复运行。") break except Exception as e: # Redis 连接失败不应该导致脚本崩溃,打印错误并跳过 print(f"检查暂停状态时 Redis 报错: {e}") class Test(UiAutomatr): def __init__(self, ip="", this_port=":5555", user='523040'): super(Test, self).__init__() self.ip = ip self.port = this_port self.z = 0 self.read() self.tao_user = user # self.create_cron_switch(mip=self.ip, minutes=20) # self.create_cron_switch(40, mip=self.ip) # self.set_fan_speed() # self.multiple_find_picture(['电信'], 20, "点击", "不返回",xy_percentage=True) # new_time = (time.strftime("%H:%M", time.localtime(time.time()+20*60))) # print("20分钟后打开手机", new_time) # switch_2400(ip="192.168.31.162", mode='开', off_list = [str(new_time)], on_list = []) self.open_url_with_browser("https://page.cainiao.com/cn-user-growth/cnug-templete/index.html#/?strategyId=112471&token=834%25252Bzknfj4SqM45f6OTR1v7x8ie%25252BrDGI5sweEjyNlB0UBAkkgFb2GQhg5qk%25253D&taskPanel=water") print("音量降低到0") elem = self.d.xpath('//*[@resource-id="taskListWrap"]/preceding-sibling::android.widget.TextView') if elem.exists: print("存在") elem.click() else: # '//*[@resource-id="taskWrap"]/android.view.View[2]' print("不存在") elem = self.d.xpath('//*[@resource-id="taskWrap"]/android.view.View[2]') if elem.exists: print('存在//*[@resource-id="taskWrap"]/android.view.View[2]') elem.click() time.sleep(200) exit() # ret = self.XPathElement(self.d, "//*[@text='每日任务列表']/*[2]/*") # if ret: # print(ret.info) # self.get_appsize() # = '//*[@text="%s"]' % self.tao_user # elem = self.d.xpath('//*[@text="淘金币标题"]/following-sibling::*//*[contains(@text, "可抵")]/preceding-sibling::android.widget.ListView') # if elem.exists: # print(elem) # ret = self.d.dump_hierarchy() # tjb_yzm = self.screen('yzm/%s.jpg' % (self.ip + 'tjb'), elem=self.d.xpath('//*[@text="淘金币"]/../following-sibling::android.view.View/android.widget.ListView')) # print(tjb_yzm) # ret = self.recognize_text_dddocr(tjb_yzm) # 让其不要过滤“无用”节点,拿最完整的树 # self.d.settings["ignoreUnimportantViews"] = False # 让 API 返回完整结构(不用 compact 响应) # self.d.settings["compactResponse"] = False # 列出所有可写设置项 # (y1(上):y2(下), x1(左):x2(右)) # [x, y], k = self.multiple_find_picture(['微信钱包'], 20, "不点击", "不返回",xy_percentage=True) # 每日任务列表 # self.d.swipe_points([[0.694, 0.32], [0.1, 0.32]], 0.1) # path = self.screen('%selemgy.jpg' % (self.ip ), region=(0.7, 1, 0.01, 1)) # time.sleep(1) # ret = pytesseract_pic_to_text(path) # if ret: # center_text = ret.get('center_text') # print(center_text) # for i in center_text: # value = center_text.get(i) # print(i, value) # if '已拥有果园' in value: # print(value) # break # print(ret) # tjb_yzm = self.screen('yzm/%s.jpg' % (self.ip + 'weixin'),region=(0.218, 0.26, 0.616, 0.858)) # x, y = 0.23, 0.23 # print(0, y, 0, 1) # tjb_yzm = self.screen('yzm/%s.jpg' % (self.ip + 'ucye'),region=(0.01, 0.17, 0, 1)) tjb_yzm = self.screen('yzm/%s.jpg' % (self.ip + 'zhifubaono'),region=(0.098, 0.17, 0, 1)) print(tjb_yzm) # , binarize_num=190 ret = pytesseract_pic_to_text(tjb_yzm, lang="eng")['center_text'] for i in ret: text = ret.get(i) # print(text) if "*" in text: print(ret) ret = self.recognize_text_dddocr(tjb_yzm) if ret: tao_user = ret time.sleep(399) # money = pytesseract_pic_to_text(tjb_yzm).get('all_text') # if money: # money = money.replace('#', '') # print(money) # tjb_yzm = self.screen('yzm/%s.jpg' % (self.ip + 'weixin2')) # all_text = pytesseract_pic_to_text(tjb_yzm).get('all_text') # print("available settings keys:", self.d.settings.available_keys()) # print("available settings keys:", self.d.settings.available_keys()) # # 或者从 __dict__ 里看内部存储 # print("settings __dict__:", self.d.settings.__dict__) # ret = self.d.dump_hierarchy() # # 正确调用:加上 self. # print("验证码是:", ret) # open(r'F:\备份\mi\hierarchy.xml', 'w', encoding='utf-8').write(ret) # print() # self.do_task_for_list() # self.clear_mobile() # self.click_vd_cross() # self.jurisdiction() # self.click_vd_cross() # self.get_mobile_ipv6() # print("打开清理软件") # d(text="浏览60秒") # self.open_accessibilit() # self.export_app() try: self.china_yzm() except Exception as e: print(e) # self.multiple_find_picture([], 20, "不点击", "不返回", delem=['d(textContains="个商品得奖励")'],xy_percentage=True) # self.d.app_start("com.miui.cleanmaster", activity="com.miui.optimizecenter.MainActivity") # self.d.app_start("com.jifen.qukan") # self.do_task_for_list(key_list=['去完成', '去领取', ' 去完成', ' 去领取'], # title1_elem='sibling(index=1).child(index=0)', # title2_elem = 'sibling(index=1).child(index=1)', # app_name = 'com.cainiao.wireless', # # '逛' # browse_key = {'关键字':['浏览'], '成功元素':['d(textContains="浏览完成")', 'd(description="任务完成")','d(text="任务已完成")'], '成功图片':[], '浏览次数':10}, # back_key_dict = {'d(text="去完成")':"break", 'd(text=" 去完成")':"break",'d(text=" 去领取")':"break", 'd(text="return to top")':"点击", 'd(text="去访问")':"点击", 'd(text="免费领水果")':"点击", }, # get_rewards=['d(text="领取")', 'd(text="领奖")', 'd(text=" 领取")', 'd(text=" 领奖")'], # exclude_list=['买好物'], # other_task_dic = {"惊喜水滴":self.jxsd}, # watch_vd={'关键字':["推荐内容"]}, # task_name="菜鸟免费领水果", # ) # self.do_task_for_list(key_list=['去完成'], # title1_elem='(//*[@text="%s"])[%s]/../preceding-sibling::*/*[1]', # title2_elem = '(//*[@text="%s"])[%s]/../preceding-sibling::*/*[3]', # app_name = 'me.ele', # browse_key = {'关键字':['浏览', '指定动作奖励', '逛', '提款机'], '成功元素':['d(textContains="浏览完成")', 'd(description="任务完成")','d(text="任务完成")'], '成功图片':[], '浏览次数':10}, # back_key_dict = {'d(text="每日任务列表")':"break", 'd(text="关闭每日任务弹窗")':"break",'d(text="饿了么果园")': '点击', 'd(text="领水滴")':"点击"}, # get_rewards=['d(text="领取")', 'd(text="领奖")',"饿了吗果园领取"], # exclude_list=['外卖实付', '3个店铺'], # task_name='饿了么果园', # ) # ret = self.XPathElement(self.d, '(//*[@text="去完成"])[1]/../preceding-sibling::*/*[1]') # # ret = self.click_element_by_xpath('(//*[@text="去完成"])[1]/../preceding-sibling::*/*[1]', info=True) # print(ret.info['text']) # # ret.click() # # elem = self.d.xpath('(//*[@text="去完成"])[1]/../preceding-sibling::*/*[1]') # center = ret['center'] # self.click(center[0], center[1]) time.sleep(50) # print("结束11111") # ret = self.yzm(times=20,swip=True) # print(ret) # from test2 import toast_get_message # toast_get_message(2) # ret = self.calculate_sliding_distance(self.d(description="全场包邮")) # print(ret) exit() # self.clear_app("com.jifen.qukan") # self.open_phone() time.sleep(30) exit() def test_uia(): Test('192.168.31.161') if __name__ == '__main__': import sys # from utils.pytest_util import do_pytest_func # do_pytest_func(script=__file__ + '::test_uia', name='测试', ip_list_t=1)arg = sys.argv arg = sys.argv ip = '192.168.31.161' if len(arg) >= 2: ip = str(arg[1]) if len(arg) >= 3: port = str(arg[2]) else: port = ":5555" Test(ip=ip, this_port=port) # Test('yys.zone', this_port=":1616")