ubuntu/linux 服务器操作面板
立即下载
资源介绍:
ubuntu/linux 服务器操作面板
from flask import Flask, request, redirect, url_for, render_template, send_from_directory, abort, flash
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import os
import mimetypes
from docx import Document
import subprocess
import logging
import sys
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'uploads'
app.config['MAX_CONTENT_PATH'] = 16 * 1024 * 1024 # 最大文件大小:16MB
app.secret_key = 'supersecretkey' # 用于flash消息
# 确保上传文件夹存在
if not os.path.exists(app.config['UPLOAD_FOLDER']):
os.makedirs(app.config['UPLOAD_FOLDER'])
# 配置调度器
scheduler = BackgroundScheduler()
scheduler.start()
# 配置日志
log_filename = os.path.join(app.config['UPLOAD_FOLDER'], 'scheduler.log')
# 创建日志记录器
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # 设置为最低级别,以便所有级别的日志都可以被记录
# 创建文件处理器并指定编码
file_handler = logging.FileHandler(log_filename, encoding='utf-8')
file_handler.setLevel(logging.DEBUG)
# 创建日志格式器
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
file_handler.setFormatter(formatter)
# 确保处理器唯一
if not logger.hasHandlers():
logger.addHandler(file_handler)
def enable_logging():
logger.setLevel(logging.DEBUG)
def disable_logging():
logger.setLevel(logging.CRITICAL)
names={}
@app.route('/')
def index():
files = os.listdir(app.config['UPLOAD_FOLDER'])
jobs = scheduler.get_jobs()
return render_template('index.html', files=files, jobs=jobs,names=names)
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
file = request.files['file']
if file:
filename = file.filename
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return redirect(url_for('index'))
return render_template('upload.html')
@app.route('/uploads/')
def uploaded_file(filename):
return send_from_directory(app.config['UPLOAD_FOLDER'], filename)
@app.route('/delete/', methods=['POST'])
def delete_file(filename):
try:
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(file_path):
os.remove(file_path)
except:
folder_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if os.path.exists(folder_path) and os.path.isdir(folder_path):
os.rmdir(folder_path)
flash(f'文件夹 {filename} 已删除', 'success')
else:
flash(f'文件夹 {filename} 不存在', 'error')
return redirect(url_for('index'))
@app.route('/view/')
def view_file(filename):
file_path = os.path.join(app.config['UPLOAD_FOLDER'], filename)
if not os.path.exists(file_path):
abort(404)
mimetype, _ = mimetypes.guess_type(file_path)
if mimetype and mimetype.startswith('text') or filename.endswith('.log'):
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
return render_template('view_text.html', content=content, filename=filename)
elif mimetype and mimetype.startswith('image'):
return render_template('view_image.html', filename=filename)
elif mimetype == 'application/pdf':
return render_template('view_pdf.html', filename=filename)
elif mimetype in ['application/msword', 'application/vnd.openxmlformats-officedocument.wordprocessingml.document']:
if filename.endswith('.docx'):
content = read_docx(file_path)
elif filename.endswith('.doc'):
content = convert_doc_to_text(file_path)
return render_template('view_text.html', content=content, filename=filename)
else:
return redirect(url_for('uploaded_file', filename=filename))
@app.route('/command', methods=['GET', 'POST'])
def command():
if request.method == 'POST':
cmd = request.form['cmd']
try:
result = subprocess.check_output(cmd, shell=True, stderr=subprocess.STDOUT, universal_newlines=True)
except subprocess.CalledProcessError as e:
result = e.output
return render_template('command.html', cmd=cmd, result=result)
return render_template('command.html')
@app.route('/schedule', methods=['GET', 'POST'])
def schedule():
if request.method == 'POST':
schedule_type = request.form['schedule_type']
script = request.form['script']
if schedule_type == 'interval':
interval_seconds = int(request.form['interval_seconds'])
job = scheduler.add_job(run_script, IntervalTrigger(seconds=interval_seconds), args=[script])
names[job.id]=(script)
flash(f'Scheduled {script} to run every {interval_seconds} seconds.', 'success')
elif schedule_type == 'cron':
hour = request.form['hour']
minute = request.form['minute']
job = scheduler.add_job(run_script, CronTrigger(hour=hour, minute=minute), args=[script])
names[job.id]=(script)
flash(f'Scheduled {script} to run at {hour}:{minute} every day.', 'success')
return redirect(url_for('index'))
scripts = [f for f in os.listdir(app.config['UPLOAD_FOLDER']) if f.endswith('.py')]
return render_template('schedule.html', scripts=scripts)
@app.route('/remove_job/', methods=['POST'])
def remove_job(job_id):
scheduler.remove_job(job_id)
flash(f'Removed job {job_id}.', 'success')
return redirect(url_for('index'))
@app.route('/create_file', methods=['POST'])
def create_file():
file_name = request.form['file_name']
if not file_name:
flash('文件名不能为空', 'error')
return redirect(url_for('index'))
file_path = os.path.join(app.config['UPLOAD_FOLDER'], file_name)
with open(file_path, 'w') as f:
f.write('')
flash(f'文件 {file_name} 已创建', 'success')
return redirect(url_for('index'))
@app.route('/create_folder', methods=['POST'])
def create_folder():
folder_name = request.form['folder_name']
if not folder_name:
flash('文件夹名不能为空', 'error')
return redirect(url_for('index'))
folder_path = os.path.join(app.config['UPLOAD_FOLDER'], folder_name)
os.makedirs(folder_path, exist_ok=True)
flash(f'文件夹 {folder_name} 已创建', 'success')
return redirect(url_for('index'))
def run_script(script):
try:
script_path = os.path.abspath(os.path.join(app.config['UPLOAD_FOLDER'], script))
result = subprocess.check_output(['python', script_path], stderr=subprocess.STDOUT, universal_newlines=True, encoding='utf-8', errors='ignore')
enable_logging()
logger.info(f"Script {script} executed successfully: {result}")
disable_logging()
except subprocess.CalledProcessError as e:
enable_logging()
logger.error(f"Error executing script {script}: {e.output}")
disable_logging()
except UnicodeDecodeError as e:
enable_logging()
logger.error(f"Unicode decoding error: {e}")
disable_logging()
except FileNotFoundError as e:
enable_logging()
logger.error(f"File not found: {e}")
disable_logging()
def read_docx(file_path):
doc = Document(file_path)
full_text = []
for para in doc.paragraphs:
full_text.append(para.text)
return '\n'.join(full_text)
def convert_doc_to_text(file_path):
# 使用libreoffice将.doc文件转换为.txt
txt_path = file_path + '.txt'
subprocess.run(['libreoff