init
Some checks failed
Build and Release / Build on macos-latest (push) Has been cancelled
Build and Release / Build on ubuntu-latest (push) Has been cancelled
Build and Release / Build on windows-latest (push) Has been cancelled
Build and Release / release (push) Has been cancelled

This commit is contained in:
Matt.Ma
2025-09-09 15:06:18 +08:00
commit 6d149cefed
6 changed files with 499 additions and 0 deletions

328
rsync-tui.py Normal file
View File

@@ -0,0 +1,328 @@
# rsync-tui: 远程文件管理与断点续传 TUI 工具
# Author: Matt (https://github.com/你的用户名)
# License: MIT
#
# 一个基于 prompt_toolkit 的跨平台远程文件管理器,支持 SSH 目录浏览、批量 rsync 下载、断点续传、实时进度输出。
import argparse
import asyncio
import os
import re
import signal
import subprocess
import sys
import threading
from prompt_toolkit.application import Application
from prompt_toolkit.formatted_text import FormattedText
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout import Layout, HSplit, VSplit, Window
from prompt_toolkit.layout.controls import FormattedTextControl
from prompt_toolkit.styles import Style
from prompt_toolkit.widgets import Frame
# 检查远端是否安装rsync如无则自动安装
def check_and_install_rsync(user, host, port):
"""检查远端是否安装rsync如无则自动尝试安装。"""
check_cmd = [
'ssh', '-p', str(port), f'{user}@{host}', 'command -v rsync'
]
result = subprocess.run(check_cmd, capture_output=True, text=True)
if result.returncode == 0:
return True
# 尝试自动安装支持apt/yum
for install in [
'sudo apt-get update && sudo apt-get install -y rsync',
'sudo yum install -y rsync'
]:
install_cmd = [
'ssh', '-p', str(port), f'{user}@{host}', install
]
res = subprocess.run(install_cmd, capture_output=True, text=True)
if res.returncode == 0:
return True
print('自动安装rsync失败请手动安装')
sys.exit(1)
def get_remote_home(user, host, port):
"""获取远程主机的家目录。"""
ssh_cmd = [
'ssh', '-p', str(port), f'{user}@{host}', 'echo $HOME'
]
result = subprocess.run(ssh_cmd, capture_output=True, text=True)
if result.returncode == 0:
return os.path.normpath(result.stdout.strip())
return '/'
# 解析ls输出
def parse_ls_output(output):
"""解析ls -lA输出返回文件条目列表。"""
lines = output.strip().split('\n')
entries = []
for line in lines:
if line.startswith('total'):
continue
parts = line.split(None, 7)
if len(parts) < 8:
continue
perms, links, user, group, size, date, time, name = parts
ftype = perms[0]
entries.append({
'ftype': ftype,
'perms': perms,
'user': user,
'group': group,
'size': size,
'date': date,
'time': time,
'name': name
})
return entries
# 获取远程目录内容
def get_entries(user, host, port, path):
"""获取远程目录内容,返回文件条目列表。"""
ls_bins = ['/bin/ls', '/usr/bin/ls', 'ls']
ls_cmds = []
for bin in ls_bins:
ls_cmds.append(f'{bin} -lA --time-style=long-iso {path}')
ls_cmds.append(f'{bin} -lA {path}')
result = None
for ls_cmd in ls_cmds:
ssh_cmd = [
'ssh', '-p', str(port), f'{user}@{host}', ls_cmd
]
try:
result = subprocess.run(ssh_cmd, check=True, capture_output=True, text=True)
break
except subprocess.CalledProcessError:
result = None
continue
if not result:
return []
entries = parse_ls_output(result.stdout)
if path != '/':
entries = [{
'ftype': 'd', 'perms': 'drwxr-xr-x', 'user': '', 'group': '', 'size': '', 'date': '', 'time': '', 'name': '..'
}] + entries
return entries
async def rsync_pull(user, host, remote_path, local_path, port, follow_symlinks, set_message_threadsafe, active_procs=None, append_output=None):
"""
使用rsync拉取远程文件/目录,支持断点续传和实时进度。
"""
remote_path = os.path.normpath(remote_path)
local_path = os.path.normpath(local_path)
ssh_cmd = f"ssh -p {port} -o LogLevel=ERROR -o StreamLocalBindUnlink=yes -o ServerAliveInterval=30"
rsync_cmd = [
'stdbuf', '-oL', 'rsync', '-avz', '--progress', '--partial', '-e', ssh_cmd
]
if follow_symlinks:
rsync_cmd.append('-L')
rsync_cmd.append(f'{user}@{host}:{remote_path}')
rsync_cmd.append(local_path)
def parse_file_progress(line):
m = re.search(r'to-chk=(\d+)/(\d+)', line)
if m:
remain = int(m.group(1))
total = int(m.group(2))
done = total - remain
return f'文件进度: {done}/{total}'
return None
proc = subprocess.Popen(rsync_cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, preexec_fn=os.setsid)
if active_procs is not None:
active_procs.append(proc)
def reader():
for line in proc.stdout:
if append_output:
append_output(line.rstrip())
msg = parse_file_progress(line)
if msg:
set_message_threadsafe(msg)
t = threading.Thread(target=reader, daemon=True)
t.start()
while t.is_alive() or proc.poll() is None:
await asyncio.sleep(0.1)
t.join()
rc = proc.wait()
if active_procs is not None:
try:
active_procs.remove(proc)
except Exception:
pass
if rc == 0:
set_message_threadsafe('传输完毕')
else:
set_message_threadsafe('同步失败')
# 交互式浏览与选择
async def interactive_browse(user, host, port, start_path):
"""
主交互界面,支持远程目录浏览、文件选择、批量下载、实时进度输出。
"""
import functools
cwd = start_path
selected = 0
selected_files = set()
message = ['']
active_procs = []
output_lines = []
style = Style.from_dict({
'selected': 'reverse',
'directory': 'ansiblue',
'symlink': 'ansimagenta',
'file': '',
'marked': 'bold underline',
'message': 'bg:#444444 #ffffff',
'output': 'bg:#222222 #00ff00',
})
def get_lines(entries, selected, marked):
lines = []
for idx, entry in enumerate(entries):
t = entry['ftype']
n = entry['name']
color = 'class:file'
if t == 'd':
color = 'class:directory'
elif t == 'l':
color = 'class:symlink'
prefix = '' if idx == selected else ' '
mark = '[*] ' if n in marked else ' '
style_line = 'class:marked' if n in marked else color
if idx == selected:
lines.append(('class:selected', prefix + mark + n + '\n'))
else:
lines.append((style_line, prefix + mark + n + '\n'))
return FormattedText(lines)
entries = get_entries(user, host, port, cwd)
kb = KeyBindings()
body_control = FormattedTextControl(lambda: get_lines(entries, selected, selected_files))
message_control = FormattedTextControl(lambda: message[0], style='class:message')
class OutputControl(FormattedTextControl):
def __init__(self, lines):
super().__init__(self.get_text, style='class:output')
self.lines = lines
def get_text(self):
return '\n'.join(self.lines)
output_control = OutputControl(output_lines)
body_window = Window(content=body_control, always_hide_cursor=False)
message_window = Window(content=message_control, height=1, style='class:message')
output_window = Window(content=output_control, width=60, style='class:output', wrap_lines=True)
def refresh(entries, selected, marked):
body_control.text = get_lines(entries, selected, marked)
message_control.text = message[0]
def set_message_threadsafe(msg):
message[0] = msg
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(functools.partial(refresh, entries, selected, selected_files))
except Exception:
refresh(entries, selected, selected_files)
def append_output(line):
output_lines.append(line)
if len(output_lines) > 30:
del output_lines[0]
try:
loop = asyncio.get_running_loop()
loop.call_soon_threadsafe(app.invalidate)
except Exception:
pass
def update_entries():
nonlocal entries
entries = get_entries(user, host, port, cwd)
@kb.add('up')
def _(event):
nonlocal selected
if selected > 0:
selected -= 1
refresh(entries, selected, selected_files)
@kb.add('down')
def _(event):
nonlocal selected
if selected < len(entries) - 1:
selected += 1
refresh(entries, selected, selected_files)
@kb.add('space')
def _(event):
entry = entries[selected]
n = entry['name']
if n in selected_files:
selected_files.remove(n)
else:
selected_files.add(n)
refresh(entries, selected, selected_files)
@kb.add('enter')
def _(event):
nonlocal cwd, entries, selected
entry = entries[selected]
if entry['name'] == '..':
cwd = os.path.dirname(cwd.rstrip('/')) or '/'
update_entries()
selected = 0
refresh(entries, selected, selected_files)
elif entry['ftype'] == 'd':
cwd = os.path.join(cwd, entry['name'])
update_entries()
selected = 0
refresh(entries, selected, selected_files)
@kb.add('d')
async def _(event):
for n in selected_files:
remote_path = os.path.join(cwd, n)
local_path = '.'
follow_symlinks = False
await rsync_pull(user, host, remote_path, local_path, port, follow_symlinks, set_message_threadsafe, active_procs, append_output)
selected_files.clear()
refresh(entries, selected, selected_files)
@kb.add('<any>')
def _(event):
if message[0]:
set_message_threadsafe('')
@kb.add('q')
def _(event):
for proc in active_procs:
try:
os.killpg(os.getpgid(proc.pid), signal.SIGKILL)
except Exception:
pass
event.app.exit()
import os as _os
_os._exit(0)
root_container = Frame(HSplit([
VSplit([
body_window,
output_window
]),
message_window
]), title=lambda: f'远程目录: {cwd} (空格选中, 回车进目录, D批量下载, Q退出)')
layout = Layout(root_container)
app = Application(layout=layout, key_bindings=kb, style=style, full_screen=True)
refresh(entries, selected, selected_files)
# 定时器强制刷新UI防止output_control未被重绘
import threading as _threading
def periodic_refresh():
try:
app.invalidate()
except Exception:
pass
_threading.Timer(0.2, periodic_refresh).start()
periodic_refresh()
await app.run_async()
def main():
"""命令行入口。"""
parser = argparse.ArgumentParser(description='rsync-tui: 远程文件管理器')
parser.add_argument('host', help='远程主机IP或域名')
parser.add_argument('--user', default='root', help='ssh用户名默认root')
parser.add_argument('--port', type=int, default=22, help='ssh端口默认22')
args = parser.parse_args()
check_and_install_rsync(args.user, args.host, args.port)
home = get_remote_home(args.user, args.host, args.port)
asyncio.run(interactive_browse(args.user, args.host, args.port, home))
if __name__ == '__main__':
main()