init
This commit is contained in:
@@ -0,0 +1,45 @@
|
||||
import asyncio
|
||||
import base64
|
||||
from typing import Callable
|
||||
|
||||
from openai import AsyncOpenAI
|
||||
|
||||
from .config import AICfg
|
||||
|
||||
|
||||
class AIClient:
|
||||
def __init__(self, cfg: AICfg):
|
||||
kwargs: dict = {"api_key": cfg.api_key or "sk-no-key"}
|
||||
if cfg.base_url:
|
||||
kwargs["base_url"] = cfg.base_url
|
||||
self._client = AsyncOpenAI(**kwargs)
|
||||
self._cfg = cfg
|
||||
|
||||
async def analyze(self, image_path: str, on_chunk: Callable[[str], None]) -> None:
|
||||
with open(image_path, "rb") as f:
|
||||
img_b64 = base64.b64encode(f.read()).decode()
|
||||
data_url = f"data:image/png;base64,{img_b64}"
|
||||
|
||||
stream = await self._client.chat.completions.create(
|
||||
model=self._cfg.model,
|
||||
max_tokens=self._cfg.max_tokens,
|
||||
stream=True,
|
||||
messages=[
|
||||
{"role": "system", "content": self._cfg.system_prompt},
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "image_url", "image_url": {"url": data_url}},
|
||||
{"type": "text", "text": "请分析截图中的题目,给出答案和解析。"},
|
||||
],
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
async for chunk in stream:
|
||||
delta = chunk.choices[0].delta.content
|
||||
if delta:
|
||||
on_chunk(delta)
|
||||
|
||||
def analyze_sync(self, image_path: str, on_chunk: Callable[[str], None]) -> None:
|
||||
asyncio.run(self.analyze(image_path, on_chunk))
|
||||
@@ -0,0 +1,114 @@
|
||||
import os
|
||||
import sys
|
||||
import tomllib
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
@dataclass
|
||||
class HotkeyCfg:
|
||||
capture: str = "<ctrl>+<shift>+a"
|
||||
toggle: str = "<ctrl>+<shift>+h"
|
||||
|
||||
|
||||
@dataclass
|
||||
class OverlayCfg:
|
||||
width: int = 460
|
||||
height: int = 360
|
||||
margin_right: int = 20
|
||||
margin_bottom: int = 40
|
||||
alpha: float = 0.93
|
||||
font_size: int = 13
|
||||
font_family: str = "Courier"
|
||||
bg_color: str = "#0f0f1a"
|
||||
fg_color: str = "#e2e2e2"
|
||||
accent_color: str = "#4ade80"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScreenshotCfg:
|
||||
delay_ms: int = 200
|
||||
tmp_path: str = "/tmp/support_agent_screenshot.png"
|
||||
|
||||
|
||||
@dataclass
|
||||
class AICfg:
|
||||
api_key: str = ""
|
||||
base_url: str = ""
|
||||
model: str = "gpt-4o"
|
||||
max_tokens: int = 1500
|
||||
system_prompt: str = "你是一个答题助手,请分析截图中的题目,给出答案和解析。"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Config:
|
||||
hotkeys: HotkeyCfg = field(default_factory=HotkeyCfg)
|
||||
overlay: OverlayCfg = field(default_factory=OverlayCfg)
|
||||
screenshot: ScreenshotCfg = field(default_factory=ScreenshotCfg)
|
||||
ai: AICfg = field(default_factory=AICfg)
|
||||
|
||||
|
||||
def _find_config_toml() -> Path | None:
|
||||
candidates = [
|
||||
Path.cwd() / "config.toml",
|
||||
Path(__file__).parent.parent / "config.toml",
|
||||
Path.home() / ".config" / "support-agent" / "config.toml",
|
||||
]
|
||||
for p in candidates:
|
||||
if p.exists():
|
||||
return p
|
||||
return None
|
||||
|
||||
|
||||
def load_config() -> Config:
|
||||
load_dotenv()
|
||||
|
||||
cfg = Config()
|
||||
|
||||
toml_path = _find_config_toml()
|
||||
if toml_path:
|
||||
with open(toml_path, "rb") as f:
|
||||
data = tomllib.load(f)
|
||||
|
||||
if "hotkeys" in data:
|
||||
h = data["hotkeys"]
|
||||
cfg.hotkeys.capture = h.get("capture", cfg.hotkeys.capture)
|
||||
cfg.hotkeys.toggle = h.get("toggle", cfg.hotkeys.toggle)
|
||||
|
||||
if "overlay" in data:
|
||||
o = data["overlay"]
|
||||
cfg.overlay.width = o.get("width", cfg.overlay.width)
|
||||
cfg.overlay.height = o.get("height", cfg.overlay.height)
|
||||
cfg.overlay.margin_right = o.get("margin_right", cfg.overlay.margin_right)
|
||||
cfg.overlay.margin_bottom = o.get("margin_bottom", cfg.overlay.margin_bottom)
|
||||
cfg.overlay.alpha = o.get("alpha", cfg.overlay.alpha)
|
||||
cfg.overlay.font_size = o.get("font_size", cfg.overlay.font_size)
|
||||
cfg.overlay.font_family = o.get("font_family", cfg.overlay.font_family)
|
||||
cfg.overlay.bg_color = o.get("bg_color", cfg.overlay.bg_color)
|
||||
cfg.overlay.fg_color = o.get("fg_color", cfg.overlay.fg_color)
|
||||
cfg.overlay.accent_color = o.get("accent_color", cfg.overlay.accent_color)
|
||||
|
||||
if "screenshot" in data:
|
||||
s = data["screenshot"]
|
||||
cfg.screenshot.delay_ms = s.get("delay_ms", cfg.screenshot.delay_ms)
|
||||
cfg.screenshot.tmp_path = s.get("tmp_path", cfg.screenshot.tmp_path)
|
||||
|
||||
if "ai" in data:
|
||||
a = data["ai"]
|
||||
cfg.ai.base_url = a.get("base_url", cfg.ai.base_url)
|
||||
cfg.ai.model = a.get("model", cfg.ai.model)
|
||||
cfg.ai.max_tokens = a.get("max_tokens", cfg.ai.max_tokens)
|
||||
cfg.ai.system_prompt = a.get("system_prompt", cfg.ai.system_prompt)
|
||||
|
||||
# 环境变量优先级高于 config.toml
|
||||
cfg.ai.api_key = os.environ.get("OPENAI_API_KEY", "")
|
||||
env_base_url = os.environ.get("OPENAI_BASE_URL", "")
|
||||
if env_base_url:
|
||||
cfg.ai.base_url = env_base_url
|
||||
|
||||
if not cfg.ai.api_key:
|
||||
print("警告:未设置 OPENAI_API_KEY,请在 .env 文件中配置", file=sys.stderr)
|
||||
|
||||
return cfg
|
||||
@@ -0,0 +1,33 @@
|
||||
import sys
|
||||
import threading
|
||||
from typing import Callable
|
||||
|
||||
from pynput import keyboard
|
||||
|
||||
from .config import HotkeyCfg
|
||||
|
||||
|
||||
class HotkeyManager:
|
||||
def __init__(self, cfg: HotkeyCfg, on_capture: Callable, on_toggle: Callable):
|
||||
self._callbacks = {
|
||||
cfg.capture: on_capture,
|
||||
cfg.toggle: on_toggle,
|
||||
}
|
||||
self._listener: keyboard.GlobalHotKeys | None = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def start(self) -> None:
|
||||
with self._lock:
|
||||
if self._listener is not None:
|
||||
return
|
||||
try:
|
||||
self._listener = keyboard.GlobalHotKeys(self._callbacks)
|
||||
self._listener.start()
|
||||
except Exception as e:
|
||||
print(f"快捷键注册失败: {e}", file=sys.stderr)
|
||||
|
||||
def stop(self) -> None:
|
||||
with self._lock:
|
||||
if self._listener:
|
||||
self._listener.stop()
|
||||
self._listener = None
|
||||
@@ -0,0 +1,58 @@
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
|
||||
from .config import load_config
|
||||
from .screenshot import ScreenshotCapture
|
||||
from .ai_client import AIClient
|
||||
from .overlay import OverlayWindow
|
||||
from .hotkey import HotkeyManager
|
||||
|
||||
|
||||
class SupportAgent:
|
||||
def __init__(self) -> None:
|
||||
self._cfg = load_config()
|
||||
self._overlay = OverlayWindow(self._cfg.overlay)
|
||||
self._screenshot = ScreenshotCapture(self._cfg.screenshot)
|
||||
self._ai = AIClient(self._cfg.ai)
|
||||
self._hotkeys = HotkeyManager(
|
||||
cfg=self._cfg.hotkeys,
|
||||
on_capture=self._on_capture,
|
||||
on_toggle=self._overlay.toggle,
|
||||
)
|
||||
self._analyzing = False
|
||||
|
||||
def _on_capture(self) -> None:
|
||||
if self._analyzing:
|
||||
return
|
||||
threading.Thread(target=self._capture_and_analyze, daemon=True).start()
|
||||
|
||||
def _capture_and_analyze(self) -> None:
|
||||
self._analyzing = True
|
||||
try:
|
||||
self._overlay.set_loading()
|
||||
image_path = self._screenshot.capture()
|
||||
self._ai.analyze_sync(image_path, self._overlay.append_text)
|
||||
except Exception as e:
|
||||
self._overlay.append_text(f"\n\n⚠ 错误:{e}")
|
||||
finally:
|
||||
self._analyzing = False
|
||||
|
||||
def run(self) -> None:
|
||||
self._hotkeys.start()
|
||||
print("support-agent 已启动")
|
||||
print(f" 截图分析: {self._cfg.hotkeys.capture}")
|
||||
print(f" 显示/隐藏: {self._cfg.hotkeys.toggle}")
|
||||
print("按 Ctrl+C 退出")
|
||||
signal.signal(signal.SIGINT, signal.SIG_DFL)
|
||||
app = self._overlay.app
|
||||
exit_code = app.exec()
|
||||
sys.exit(exit_code)
|
||||
|
||||
|
||||
def main() -> None:
|
||||
SupportAgent().run()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -0,0 +1,208 @@
|
||||
from PyQt6.QtWidgets import QApplication, QWidget, QVBoxLayout, QHBoxLayout, QTextEdit, QLabel
|
||||
from PyQt6.QtCore import Qt, pyqtSignal, QPoint
|
||||
from PyQt6.QtGui import QColor, QTextCursor, QMouseEvent
|
||||
|
||||
from .config import OverlayCfg
|
||||
|
||||
_TITLE = "AI 答题助手"
|
||||
_LOADING_TEXT = "正在分析截图,请稍候..."
|
||||
|
||||
|
||||
class _CloseBtn(QLabel):
|
||||
def __init__(self, on_click, parent=None):
|
||||
super().__init__("×", parent)
|
||||
self._on_click = on_click
|
||||
self.setCursor(Qt.CursorShape.PointingHandCursor)
|
||||
|
||||
def mousePressEvent(self, event) -> None:
|
||||
if event.button() == Qt.MouseButton.LeftButton:
|
||||
self._on_click()
|
||||
|
||||
|
||||
class _TitleBar(QWidget):
|
||||
def __init__(self, win: "OverlayWindow", cfg: OverlayCfg):
|
||||
super().__init__(win)
|
||||
self._win = win
|
||||
self.setFixedHeight(24)
|
||||
self.setStyleSheet(f"background-color: {cfg.bg_color}; border-radius: 6px 6px 0 0;")
|
||||
|
||||
self.setMouseTracking(True)
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_StyledBackground, True)
|
||||
|
||||
self._drag_pos = None
|
||||
|
||||
layout = QHBoxLayout(self)
|
||||
layout.setContentsMargins(10, 0, 6, 0)
|
||||
|
||||
title_lbl = QLabel(f"· {_TITLE}")
|
||||
title_lbl.setStyleSheet(f"color: {cfg.accent_color}; font-size: 11px;")
|
||||
# 让标签对鼠标透明,事件穿透到 _TitleBar
|
||||
title_lbl.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents)
|
||||
|
||||
close_btn = _CloseBtn(win._on_hide)
|
||||
close_btn.setStyleSheet(f"color: {cfg.accent_color}; padding: 0 4px; font-size: 14px;")
|
||||
|
||||
layout.addWidget(title_lbl)
|
||||
layout.addStretch()
|
||||
layout.addWidget(close_btn)
|
||||
|
||||
def mousePressEvent(self, event):
|
||||
if event.button() == Qt.MouseButton.LeftButton:
|
||||
# 👇 就是在这里用 frameGeometry()
|
||||
self._drag_pos = event.globalPosition().toPoint() - self._win.frameGeometry().topLeft()
|
||||
|
||||
def mouseMoveEvent(self, event):
|
||||
if self._drag_pos is not None and event.buttons() & Qt.MouseButton.LeftButton:
|
||||
self._win.move(event.globalPosition().toPoint() - self._drag_pos)
|
||||
|
||||
def mouseReleaseEvent(self, event):
|
||||
self._drag_pos = None
|
||||
|
||||
|
||||
|
||||
class OverlayWindow(QWidget):
|
||||
_sig_append = pyqtSignal(str)
|
||||
_sig_loading = pyqtSignal()
|
||||
_sig_clear = pyqtSignal()
|
||||
_sig_show = pyqtSignal()
|
||||
_sig_hide = pyqtSignal()
|
||||
_sig_toggle = pyqtSignal()
|
||||
|
||||
def __init__(self, cfg: OverlayCfg):
|
||||
self._app = QApplication.instance() or QApplication([])
|
||||
super().__init__()
|
||||
self._cfg = cfg
|
||||
self._drag_pos: QPoint | None = None
|
||||
|
||||
self._sig_append.connect(self._on_append)
|
||||
self._sig_loading.connect(self._on_loading)
|
||||
self._sig_clear.connect(self._on_clear)
|
||||
self._sig_show.connect(self._on_show)
|
||||
self._sig_hide.connect(self._on_hide)
|
||||
self._sig_toggle.connect(self._on_toggle)
|
||||
|
||||
self._setup_window()
|
||||
self._setup_ui()
|
||||
self._position_bottom_right()
|
||||
self.show()
|
||||
|
||||
def _setup_window(self) -> None:
|
||||
self.setWindowFlags(
|
||||
Qt.WindowType.WindowStaysOnTopHint |
|
||||
Qt.WindowType.Tool
|
||||
)
|
||||
self.setAttribute(Qt.WidgetAttribute.WA_TranslucentBackground)
|
||||
self.setWindowOpacity(self._cfg.alpha)
|
||||
self.resize(self._cfg.width, self._cfg.height)
|
||||
|
||||
def _setup_ui(self) -> None:
|
||||
cfg = self._cfg
|
||||
outer = QVBoxLayout(self)
|
||||
outer.setContentsMargins(0, 0, 0, 0)
|
||||
|
||||
container = QWidget()
|
||||
container.setObjectName("container")
|
||||
container.setStyleSheet(f"""
|
||||
#container {{
|
||||
background-color: {cfg.bg_color};
|
||||
border-radius: 6px;
|
||||
}}
|
||||
""")
|
||||
outer.addWidget(container)
|
||||
|
||||
inner = QVBoxLayout(container)
|
||||
inner.setContentsMargins(0, 0, 0, 0)
|
||||
inner.setSpacing(0)
|
||||
|
||||
inner.addWidget(_TitleBar(self, cfg))
|
||||
|
||||
self._text = QTextEdit()
|
||||
self._text.setReadOnly(True)
|
||||
self._text.setStyleSheet(f"""
|
||||
QTextEdit {{
|
||||
background-color: {cfg.bg_color};
|
||||
color: {cfg.fg_color};
|
||||
border: none;
|
||||
padding: 6px 8px;
|
||||
font-size: 13px;
|
||||
}}
|
||||
QScrollBar:vertical {{
|
||||
background: transparent;
|
||||
width: 4px;
|
||||
border: none;
|
||||
}}
|
||||
QScrollBar::handle:vertical {{
|
||||
background: {cfg.accent_color};
|
||||
border-radius: 2px;
|
||||
}}
|
||||
QScrollBar::add-line:vertical, QScrollBar::sub-line:vertical {{
|
||||
height: 0;
|
||||
}}
|
||||
""")
|
||||
self._text.setAttribute(Qt.WidgetAttribute.WA_TransparentForMouseEvents)
|
||||
inner.addWidget(self._text)
|
||||
|
||||
def _position_bottom_right(self) -> None:
|
||||
cfg = self._cfg
|
||||
screen = QApplication.primaryScreen().availableGeometry()
|
||||
x = screen.width() - cfg.width - cfg.margin_right
|
||||
y = screen.height() - cfg.height - cfg.margin_bottom
|
||||
self.move(x, y)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 公开 API
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def set_loading(self) -> None:
|
||||
self._sig_loading.emit()
|
||||
|
||||
def append_text(self, chunk: str) -> None:
|
||||
self._sig_append.emit(chunk)
|
||||
|
||||
def clear(self) -> None:
|
||||
self._sig_clear.emit()
|
||||
|
||||
def toggle(self) -> None:
|
||||
self._sig_toggle.emit()
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# 槽
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _on_loading(self) -> None:
|
||||
self._text.clear()
|
||||
self._text.setTextColor(QColor(self._cfg.accent_color))
|
||||
self._text.insertPlainText(_LOADING_TEXT)
|
||||
self._text.setTextColor(QColor(self._cfg.fg_color))
|
||||
self._on_show()
|
||||
|
||||
def _on_append(self, chunk: str) -> None:
|
||||
current = self._text.toPlainText()
|
||||
if current in (_LOADING_TEXT, _LOADING_TEXT + "\n"):
|
||||
self._text.clear()
|
||||
cursor = self._text.textCursor()
|
||||
cursor.movePosition(QTextCursor.MoveOperation.End)
|
||||
self._text.setTextCursor(cursor)
|
||||
self._text.insertPlainText(chunk)
|
||||
self._text.ensureCursorVisible()
|
||||
|
||||
def _on_clear(self) -> None:
|
||||
self._text.clear()
|
||||
|
||||
def _on_show(self) -> None:
|
||||
self.show()
|
||||
self.raise_()
|
||||
|
||||
def _on_hide(self) -> None:
|
||||
self.hide()
|
||||
|
||||
def _on_toggle(self) -> None:
|
||||
if self.isVisible():
|
||||
self.hide()
|
||||
else:
|
||||
self.show()
|
||||
self.raise_()
|
||||
|
||||
@property
|
||||
def app(self) -> QApplication:
|
||||
return self._app
|
||||
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
import shutil
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
from .config import ScreenshotCfg
|
||||
|
||||
|
||||
class ScreenshotCapture:
|
||||
def __init__(self, cfg: ScreenshotCfg):
|
||||
self._cfg = cfg
|
||||
|
||||
def capture(self) -> str:
|
||||
"""截取全屏,返回保存路径。自动选择当前平台最佳方式。"""
|
||||
if self._cfg.delay_ms > 0:
|
||||
time.sleep(self._cfg.delay_ms / 1000)
|
||||
|
||||
out_path = self._cfg.tmp_path
|
||||
Path(out_path).parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if sys.platform == "darwin":
|
||||
self._capture_macos(out_path)
|
||||
else:
|
||||
self._capture_linux(out_path)
|
||||
|
||||
if not Path(out_path).exists():
|
||||
raise RuntimeError(f"截图文件未生成: {out_path}")
|
||||
return out_path
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# macOS
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _capture_macos(self, out_path: str) -> None:
|
||||
# screencapture -x 静默截全屏(无声音/闪烁)
|
||||
subprocess.run(["screencapture", "-x", out_path], check=True)
|
||||
|
||||
# ------------------------------------------------------------------
|
||||
# Linux(按优先级尝试)
|
||||
# ------------------------------------------------------------------
|
||||
|
||||
def _capture_linux(self, out_path: str) -> None:
|
||||
wayland = os.environ.get("WAYLAND_DISPLAY")
|
||||
|
||||
if wayland:
|
||||
# Wayland 优先:grim(轻量)> spectacle > gnome-screenshot
|
||||
if shutil.which("grim"):
|
||||
subprocess.run(["grim", out_path], check=True)
|
||||
return
|
||||
if shutil.which("spectacle"):
|
||||
# spectacle -b 后台 -f 全屏 -n 无通知 -o 输出路径
|
||||
# 丢弃 stderr(Tesseract 未安装的无害警告)
|
||||
subprocess.run(
|
||||
["spectacle", "-b", "-f", "-n", "-o", out_path],
|
||||
check=True,
|
||||
timeout=10,
|
||||
stderr=subprocess.DEVNULL,
|
||||
)
|
||||
return
|
||||
if shutil.which("gnome-screenshot"):
|
||||
subprocess.run(["gnome-screenshot", "-f", out_path], check=True)
|
||||
return
|
||||
# 最后尝试 mss(可能在 XWayland 下有效)
|
||||
self._capture_mss(out_path)
|
||||
else:
|
||||
# X11:mss 最快,失败则用 scrot
|
||||
try:
|
||||
self._capture_mss(out_path)
|
||||
except Exception:
|
||||
if shutil.which("scrot"):
|
||||
subprocess.run(["scrot", out_path], check=True)
|
||||
elif shutil.which("import"): # ImageMagick
|
||||
subprocess.run(["import", "-window", "root", out_path], check=True)
|
||||
else:
|
||||
raise RuntimeError("未找到可用的截图工具(scrot / import)")
|
||||
|
||||
def _capture_mss(self, out_path: str) -> None:
|
||||
import mss
|
||||
import mss.tools
|
||||
|
||||
with mss.mss() as sct:
|
||||
monitor = sct.monitors[0] # 0 = 全部显示器合并
|
||||
screenshot = sct.grab(monitor)
|
||||
mss.tools.to_png(screenshot.rgb, screenshot.size, output=out_path)
|
||||
Reference in New Issue
Block a user