跳转至

📜 脚本编写 — 原理与实战

先讲透原理,再给实战脚本。学完就能用。

---

📖 目录

概览:Python vs Shell 脚本对比

维度 Shell (bash) Python
本质 命令解释器,串命令 通用编程语言
擅长 文件操作、系统管理、管道 数据处理、复杂逻辑、网络编程
语法 简陋,坑多(空格引号玄学) 优雅清晰,接近伪代码
数据结构 只有字符串和弱数组 list/dict/set/tuple 随便用
错误处理 靠 $? 看返回值 try/except 完整异常机制
第三方库 几乎没有 海量 pip 包

联系: Python 可调用 Shell(subprocess.run()),Shell 可调用 Python(python3 script.py),都是解释型语言,运行在终端环境,运维场景常配合使用。

场景选择: - Shell:简单文件批量操作、系统管理、管道组合、crontab - Python:日志解析、Web 爬虫/API、复杂条件逻辑、自动化工具、>50行脚本

学习建议: Shell 是运维必备先搞熟,Python 长远更重要是运维自动化核心。建议路径:Python 基础捡回 → subprocess 模块 → 实用脚本练手。核心:Shell = 会用命令,Python = 会写程序,两个都得会。

第一部分:Shell 脚本(1-14)

编号 知识点 核心内容
1 脚本基础 Shebang、执行方式、source 区别
2 变量 特殊变量 $# $@ $? $$、参数校验
3 字符串操作 截取 ${str:0:5}、替换 ${str/old/new}、删除前后缀
4 数组 索引数组、关联数组 declare -A、批量操作
5 运算符 算术 $(( ))、文件测试 [ -f ]、逻辑组合
6 条件判断 if/elif、case 多值匹配、文件测试
7 循环 for/while/until、break、并发控制
8 函数 参数传递、返回值、local 作用域、重试机制
9 输入输出与重定向 > >> 2> &> 管道、Here Document
10 文本三剑客 grep 过滤、sed 编辑、awk 处理
11 正则表达式 BRE/ERE、贪婪/非贪婪、分组
12 文件测试与操作 -f -d -e -r -w、find、文件完整性
13 信号处理 trap、EXIT/INT/TERM、临时文件清理、回滚
14 脚本调试 set -euo pipefail、bash -x、调试日志

第二部分:Python 脚本(15-30)

编号 知识点 核心内容
15 基础语法 类型、f-string、命令行参数
16 字符串操作 split/join/replace/strip、切片
17 列表 列表推导式、map/filter、高阶函数
18 元组与集合 不可变序列、集合运算(交并差)
19 字典 字典推导式、Counter、嵌套结构
20 条件与循环 match-case、enumerate、zip
21 函数与模块 参数类型、装饰器、argparse
22 文件操作 with 语句、pathlib、批量操作
23 异常处理 try/except、自定义异常、重试装饰器
24 os 与 subprocess 系统命令、进程管理、路径操作
25 正则表达式 re 模块、命名分组、文本提取
26 JSON 与 CSV 序列化/反序列化、数据处理
27-30 综合实战项目 自动化部署、日志告警、健康检查、报告生成

🎯 学习路线


第一部分:Shell 脚本


1. 脚本基础 — Shebang 与执行方式

原理

Shell 脚本 = 把命令写进文件自动执行。第一行 #!(shebang)告诉系统用哪个解释器。

执行方式 命令 是否开子shell 变量影响当前shell
./script.sh 需要 chmod +x ✅ 是 ❌ 不影响
bash script.sh 不需要权限 ✅ 是 ❌ 不影响
source script.sh 不需要权限 ❌ 不开 ✅ 影响

关键区别: source 在当前 shell 执行,变量会留下来;其他方式开子 shell,执行完变量就丢了。

脚本文件后缀名

  • Linux 不关心后缀名,脚本能不能执行只看:① 有没有执行权限(chmod +x);② 有没有 Shebang 行(#!/bin/bash
  • .sh约定俗成,不是强制的;任何后缀甚至没后缀都能运行
  • .sh 的作用:方便人识别 + 编辑器语法高亮 + ls 一眼区分文件类型
  • 决定能否执行的是 chmod +x#!/bin/bash,跟后缀名无关

脚本文件存放目录

  • 放哪个目录没有强制要求,放哪都能运行(只要路径对)
  • PATH 环境变量决定了能不能直接用脚本名运行(不用写完整路径)
  • /usr/local/bin/:系统级,放这里所有用户都能直接运行
  • ~/bin/~/.local/bin/:用户级,只有当前用户能直接运行
  • 非 PATH 目录:必须用完整路径或相对路径 ./script.sh 才能运行
  • 关键点:脚本在不在 PATH 里的目录下,决定了能不能直接用名字运行

脚本在服务器和 Mac 上的存放位置

  • 两边都放 ~/bin/,用户级目录,Linux 和 macOS 通用
  • 操作:mkdir -p ~/bin → 复制脚本进去 → chmod +x 加执行权限
  • 验证 PATH:echo $PATH | tr ':' '\n' | grep "$HOME/bin"
  • 服务器 PATH 没有 ~/bin → 加到 ~/.bashrcexport PATH="$HOME/bin:$PATH"
  • Mac PATH 没有 ~/bin → 加到 ~/.zshrcexport PATH="$HOME/bin:$PATH"
  • 加完后 source ~/.bashrcsource ~/.zshrc 生效
  • ~/bin 在 PATH 里后,任何地方直接用脚本名运行,不用写路径

实战脚本

① 服务器初始化脚本 — 新机器一键配置基础环境

#!/bin/bash
# 服务器初始化 - 适配 RHEL/CentOS
echo "=== 系统信息 ==="
echo "主机名: $(hostname)"
echo "内核: $(uname -r)"
echo "IP: $(hostname -I | awk '{print $1}')"

echo "=== 基础配置 ==="
timedatectl set-timezone Asia/Shanghai
hostnamectl set-hostname node-$(hostname -I | awk '{print $1}' | tr '.' '-')

echo "=== 安装基础工具 ==="
yum install -y vim wget curl net-tools lsof tree htop >/dev/null 2>&1
echo "✅ 初始化完成"

② 环境变量加载器 — 用 source 保留变量到当前 shell

#!/bin/bash
# load_env.sh — source 执行,变量会留在当前 shell
export APP_HOME="/opt/myapp"
export APP_LOG="$APP_HOME/logs"
export PATH="$APP_HOME/bin:$PATH"
echo "环境已加载: APP_HOME=$APP_HOME"
source load_env.sh   # 用 source 执行
echo $APP_HOME       # /opt/myapp — 变量保留了
# 如果用 ./load_env.sh,这里读不到 APP_HOME

⚠️ Shell 脚本空格问题(重要!)

  • Linux Shell 对空格非常敏感,空格丢了就会报错
  • 必须有空格的位置[ 后面、] 前面、; 前后、命令和参数之间
  • 错误示例:if [-z "$1"];then → 正确:if [ -z "$1" ]; then
  • 错误示例:dig + short"$domain" → 正确:dig +short "$domain"
  • 错误示例:head-1 → 正确:head -1
  • 复制粘贴脚本时经常丢失空格,要特别注意检查
  • [ 其实是个命令(test 命令),后面必须空格才能正确解析

2. 变量 — 定义规则与特殊变量

原理

  • 等号两边不能有空格name="Linux" ✅ / name = "Linux"
  • 无类型声明:同一个变量可以先当字符串用,再当数字用
  • 花括号防歧义${name}_backup 不写花括号会解析成 $name_backup

特殊变量 是脚本接收外部参数的核心机制:

变量 作用 典型场景
$1~$9 位置参数 script.sh servera serverb$1=servera
$# 参数个数 判断用户是否传了足够的参数
$@ 所有参数(独立) "$@" 遍历所有参数
$? 上条命令返回值 判断命令是否成功
$$ 当前 PID 锁文件防止重复运行

实战脚本

① 带参数校验的备份脚本 — 用 $#$1 做参数检查

#!/bin/bash
# 用法: backup.sh <源目录> [目标目录]
if [ $# -lt 1 ]; then
    echo "用法: $0 <源目录> [目标目录]"
    exit 1
fi

SRC="$1"
DST="${2:-/backup/$(date +%Y%m%d)}"

if [ ! -d "$SRC" ]; then
    echo "❌ 源目录不存在: $SRC"
    exit 1
fi

mkdir -p "$DST"
tar czf "$DST/backup_$(date +%H%M%S).tar.gz" -C "$SRC" .
echo "✅ 备份完成: $DST"

② 防重复运行锁 — 用 $$$? 实现进程锁

#!/bin/bash
LOCK="/tmp/$(basename $0).lock"

if [ -f "$LOCK" ]; then
    pid=$(cat "$LOCK")
    if kill -0 "$pid" 2>/dev/null; then
        echo "脚本已在运行 (PID: $pid),退出"
        exit 1
    fi
fi

echo $$ > "$LOCK"
trap "rm -f $LOCK" EXIT

# === 你的业务逻辑 ===
echo "开始执行任务..."
sleep 10
echo "任务完成"

③ 批量服务器巡检 — 用 $@ 遍历多台服务器

#!/bin/bash
# 用法: check.sh server1 server2 server3
if [ $# -eq 0 ]; then echo "用法: $0 <host1> [host2] ..."; exit 1; fi

for host in "$@"; do
    if ping -c1 -W2 "$host" &>/dev/null; then
        echo "✅ $host 可达"
    else
        echo "❌ $host 不可达"
    fi
done

3. 字符串操作 — 截取与替换

原理

Bash 内置了字符串操作,不需要调用外部命令,速度快:

操作 语法 说明
长度 ${#str} 字符数
截取 ${str:起始:长度} 类似 Python 切片
替换 ${str/旧/新} 第一个匹配;// 全部替换
删除前缀 ${str#pattern} 最短匹配删除
删除后缀 ${str%pattern} 最短匹配删除
最长匹配 ${str##} / ${str%%} 贪婪匹配

#% 的方向: # 从左边删,% 从右边删。加一个是最短匹配,加两个是最长匹配。

实战脚本

① 自动清理过期日志 — 用 ${str%.*} 提取文件名

#!/bin/bash
LOG_DIR="/var/log/myapp"
KEEP_DAYS=7

for f in "$LOG_DIR"/*.log; do
    [ -f "$f" ] || continue
    filename="${f##*/}"          # 提取文件名: access.log
    name="${filename%.*}"        # 去掉扩展名: access
    ext="${filename##*.}"        # 取扩展名: log

    if [ $(find "$f" -mtime +$KEEP_DAYS | wc -l) -gt 0 ]; then
        mv "$f" "$LOG_DIR/archive/${name}_$(date +%Y%m%d).${ext}"
        echo "归档: $filename"
    fi
done

② 批量修改配置文件 — 用 ${var/old/new} 和 sed

#!/bin/bash
# 把配置文件中的占位符替换成实际值
CONF="/etc/myapp/config.yml"
NEW_PORT=8080
NEW_DB="192.168.1.100:3306"

sed -i "s/__PORT__/$NEW_PORT/g" "$CONF"
sed -i "s/__DB_HOST__/${NEW_DB%%:*}/g" "$CONF"    # 提取IP: 192.168.1.100
sed -i "s/__DB_PORT__/${NEW_DB##*:}/g" "$CONF"    # 提取端口: 3306
echo "✅ 配置已更新: 端口=$NEW_PORT, 数据库=$NEW_DB"

③ 路径规范化 — 确保路径末尾有/没有/

#!/bin/bash
path="/opt/myapp/"

# 去掉末尾斜杠
path="${path%/}"
echo "去掉末尾/: $path"      # /opt/myapp

# 确保末尾有斜杠
path="$path/"
echo "加上末尾/: $path"      # /opt/myapp/

4. 数组 — 索引数组与关联数组

原理

  • 索引数组:用下标访问,从 0 开始
  • 关联数组(bash 4+):用 declare -A,类似 Python 的字典
  • ${arr[@]} 展开所有元素,加双引号 "$@" 才能保留带空格的元素

为什么 "$@" 要加引号? 不加引号时,bash 会对每个元素做 word splitting。如果元素含空格,就会被拆开。

实战脚本

① 批量用户管理 — 数组存储用户信息

#!/bin/bash
# 批量创建用户并设置密码
USERS=("alice:Admin@123" "bob:Pass@456" "charlie:Secure@789")

for entry in "${USERS[@]}"; do
    name="${entry%%:*}"
    pass="${entry##*:}"

    if id "$name" &>/dev/null; then
        echo "⚠️  $name 已存在,跳过"
    else
        useradd -m "$name" && echo "$name:$pass" | chpasswd
        echo "✅ 创建用户: $name"
    fi
done

② 服务健康检查 — 关联数组存储服务和端口

#!/bin/bash
declare -A SERVICES=(
    [nginx]=80
    [mysql]=3306
    [redis]=6379
    [ssh]=22
)

echo "=== 服务健康检查 ==="
for svc in "${!SERVICES[@]}"; do
    port=${SERVICES[$svc]}
    if ss -tlnp | grep -q ":${port} "; then
        echo "✅ $svc (端口 $port) 正常"
    else
        echo "❌ $svc (端口 $port) 异常!"
    fi
done

③ 从配置文件批量读取 — 数组 + while read

#!/bin/bash
# servers.txt 内容:
# server1 192.168.1.10
# server2 192.168.1.11

declare -a SERVERS=()
while read -r name ip; do
    SERVERS+=("$name:$ip")
done < servers.txt

for srv in "${SERVERS[@]}"; do
    name="${srv%%:*}"
    ip="${srv##*:}"
    echo "检查 $name ($ip)..."
    ssh -o ConnectTimeout=3 "$ip" "uptime" 2>/dev/null || echo "  无法连接"
done

5. 运算符 — 算术与条件运算

原理

Bash 算术运算用 $(( ))(( ))只支持整数运算

场景 语法 说明
算术计算 $((a+b)) 返回计算结果
自增自减 ((a++)) 修改变量本身
条件判断 (( a > b )) 用于 if/while
字符串比较 [[ "$a" == "$b" ]] == 用双中括号
文件测试 [ -f file ] 单中括号
逻辑组合 [[ cond1 && cond2 ]] && \|\|

[ ] vs [[ ]] [[ ]] 是 bash 扩展,支持 &&||、正则匹配 =~,更安全。建议用 [[ ]]

实战脚本

① 磁盘空间告警 — 算术比较 + 条件判断

#!/bin/bash
THRESHOLD=80

for mount in / /home /var; do
    [ -d "$mount" ] || continue
    usage=$(df "$mount" | awk 'NR==2{print $5}' | tr -d '%')
    if (( usage >= THRESHOLD )); then
        echo "⚠️  警告: $mount 使用率 ${usage}% (阈值 ${THRESHOLD}%)"
    fi
done

② IP 地址验证 — 正则 + 条件判断

#!/bin/bash
validate_ip() {
    local ip=$1
    [[ $ip =~ ^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$ ]] || return 1
    IFS='.' read -r -a octets <<< "$ip"
    for octet in "${octets[@]}"; do
        (( octet >= 0 && octet <= 255 )) || return 1
    done
    return 0
}

for ip in "192.168.1.1" "256.1.1.1" "abc" "10.0.0.1"; do
    validate_ip "$ip" && echo "✅ $ip 有效" || echo "❌ $ip 无效"
done

6. 条件判断 — if / case / 文件测试

原理

if 语句的核心是条件表达式:

if [[ 条件 ]]; then
    ...
elif [[ 条件 ]]; then
    ...
else
    ...
fi

case 语句适合多值匹配,比一串 if-elif 更清晰:

case $变量 in
    模式1|模式2)  命令 ;;    # | 表示"或"
    模式*)        命令 ;;    # * 通配符
    *)            默认 ;;    # 兜底
esac

文件测试常用于检查文件状态,避免重复操作。

实战脚本

① 日志轮转脚本 — 根据文件大小自动轮转

#!/bin/bash
LOG_FILE="/var/log/myapp/app.log"
MAX_SIZE=$((50 * 1024 * 1024))   # 50MB

if [[ -f "$LOG_FILE" ]]; then
    size=$(stat -c%s "$LOG_FILE")
    if (( size >= MAX_SIZE )); then
        mv "$LOG_FILE" "${LOG_FILE}.$(date +%Y%m%d%H%M%S)"
        echo "日志已轮转: 新文件大小 $((size/1024/1024))MB"
        # 清理 30 天前的轮转日志
        find "$(dirname "$LOG_FILE")" -name "app.log.*" -mtime +30 -delete
    fi
fi

② 服务启停脚本 — case 实现 start/stop/restart

#!/bin/bash
# 用法: service.sh {start|stop|restart|status}
APP_NAME="myapp"
PID_FILE="/var/run/$APP_NAME.pid"

case "$1" in
    start)
        if [[ -f "$PID_FILE" ]] && kill -0 $(cat "$PID_FILE") 2>/dev/null; then
            echo "$APP_NAME 已在运行 (PID: $(cat "$PID_FILE"))"
        else
            nohup /opt/$APP_NAME/bin/start.sh > /dev/null 2>&1 &
            echo $! > "$PID_FILE"
            echo "✅ $APP_NAME 已启动 (PID: $!)"
        fi
        ;;
    stop)
        if [[ -f "$PID_FILE" ]]; then
            kill $(cat "$PID_FILE") && rm -f "$PID_FILE"
            echo "✅ $APP_NAME 已停止"
        else
            echo "$APP_NAME 未运行"
        fi
        ;;
    restart)
        $0 stop; sleep 1; $0 start
        ;;
    status)
        [[ -f "$PID_FILE" ]] && kill -0 $(cat "$PID_FILE") 2>/dev/null \
            && echo "运行中 (PID: $(cat "$PID_FILE"))" || echo "未运行"
        ;;
    *)  echo "用法: $0 {start|stop|restart|status}"; exit 1 ;;
esac

③ SSH 白名单管理 — 条件判断 + 文件操作

#!/bin/bash
WHITELIST="/etc/ssh/whitelist"
ACTION=$1    # add / remove / list
IP=$2

case "$ACTION" in
    add)
        [[ -z "$IP" ]] && echo "用法: $0 add <IP>" && exit 1
        grep -qxF "$IP" "$WHITELIST" 2>/dev/null || echo "$IP" >> "$WHITELIST"
        echo "✅ 已添加: $IP"
        ;;
    remove)
        [[ -z "$IP" ]] && echo "用法: $0 remove <IP>" && exit 1
        sed -i "/^${IP}$/d" "$WHITELIST"
        echo "✅ 已移除: $IP"
        ;;
    list)
        [[ -f "$WHITELIST" ]] && cat "$WHITELIST" || echo "白名单为空"
        ;;
esac

7. 循环 — for / while / until

原理

  • for:已知循环次数,或遍历列表
  • while:条件为真时持续执行
  • until:条件为假时执行(和 while 相反)
  • break N:跳出 N 层循环

for 的三种常见形式:

for i in 1 2 3; do ...; done        # 遍历列表
for ((i=0; i<10; i++)); do ...; done # C 风格
for f in /etc/*.conf; do ...; done   # 通配符遍历文件

实战脚本

① 批量主机存活检测 — 并发 ping 提高效率

#!/bin/bash
# 从文件读取主机列表,并发检测存活
check_host() {
    if ping -c1 -W2 "$1" &>/dev/null; then
        echo "✅ $1"
    else
        echo "❌ $1"
    fi
}

export -f check_host
cat servers.txt | xargs -P 20 -I {} bash -c 'check_host "$@"' _ {}
# -P 20: 20个并发

② 自动部署脚本 — while read 逐行处理

#!/bin/bash
# 从配置文件读取部署任务
# tasks.txt 格式: 操作|目标|源文件
# deploy|server1:/opt/app|dist.tar.gz

while IFS='|' read -r action target source; do
    [[ "$action" =~ ^# ]] && continue    # 跳过注释行
    [[ -z "$action" ]] && continue        # 跳过空行

    case "$action" in
        deploy)
            host="${target%%:*}"
            path="${target##*:}"
            echo "部署 $source$target"
            scp "$source" "$host:$path/"
            ssh "$host" "cd $path && tar xzf $(basename $source) && systemctl restart app"
            ;;
        restart)
            echo "重启 $target"
            ssh "$target" "systemctl restart app"
            ;;
    esac
done < tasks.txt

③ 日志实时监控 — while + tail -f

#!/bin/bash
# 监控日志中的错误并告警
tail -f /var/log/myapp/error.log | while read -r line; do
    if echo "$line" | grep -qiE "error|exception|fatal"; then
        # 防止告警风暴:同一错误 5 分钟内只告警一次
        msg=$(echo "$line" | md5sum | awk '{print $1}')
        last_alert="/tmp/alert_$msg"
        if [[ ! -f "$last_alert" ]] || (( $(date +%s) - $(stat -c%Y "$last_alert") > 300 )); then
            date +%s > "$last_alert"
            echo "[告警] $line"
            # 这里可以接入钉钉/飞书 webhook
        fi
    fi
done

8. 函数 — 参数传递与返回值

原理

Shell 函数的关键点:

  • 参数:通过 $1 $2 ... 传递,不是函数括号里的变量
  • 返回值return 只能返回 0-255(退出码),要返回字符串用 echo + 命令替换
  • 局部变量local 关键字限定作用域,避免污染全局

最佳实践:

my_func() {
    local result=""           # 局部变量
    result="计算结果"         # 赋值
    echo "$result"            # 返回字符串(通过 echo)
    return 0                  # 返回状态码
}

output=$(my_func)             # 接收字符串返回值
my_func; echo $?              # 接收状态码

实战脚本

① 带颜色的日志库 — 实用的日志函数

#!/bin/bash
# 日志库
LOG_FILE="/var/log/deploy.log"

log() {
    local level=$1; shift
    local msg="$*"
    local ts=$(date '+%Y-%m-%d %H:%M:%S')

    # 终端输出(带颜色)
    case "$level" in
        ERROR) echo -e "\033[31m[$ts] [$level] $msg\033[0m" ;;
        WARN)  echo -e "\033[33m[$ts] [$level] $msg\033[0m" ;;
        INFO)  echo -e "\033[32m[$ts] [$level] $msg\033[0m" ;;
    esac

    # 同时写入日志文件
    echo "[$ts] [$level] $msg" >> "$LOG_FILE"
}

# 使用
log INFO "部署开始"
log WARN "磁盘空间不足"
log ERROR "部署失败,已回滚"

② 带重试的命令执行 — 运维必备

#!/bin/bash
retry() {
    local max_attempts=$1; shift
    local delay=$1; shift
    local cmd="$*"

    for ((i=1; i<=max_attempts; i++)); do
        if eval "$cmd"; then
            return 0
        fi
        echo "第 $i 次尝试失败,${delay}s 后重试..."
        sleep "$delay"
    done
    echo "❌ $max_attempts 次尝试后仍然失败"
    return 1
}

# 使用
retry 3 5 "curl -sf http://api.example.com/health"

③ 配置解析器 — 函数封装通用逻辑

#!/bin/bash
# 解析 key=value 格式的配置文件
# config.ini 内容:
# host=192.168.1.1
# port=8080
# debug=true

parse_config() {
    local file=$1 key=$2 default=${3:-""}
    local value=$(grep "^${key}=" "$file" 2>/dev/null | cut -d'=' -f2-)
    echo "${value:-$default}"
}

HOST=$(parse_config config.ini host "localhost")
PORT=$(parse_config config.ini port "80")
DEBUG=$(parse_config config.ini debug "false")

echo "主机: $HOST  端口: $PORT  调试: $DEBUG"

9. 输入输出与重定向

原理

文件描述符: 0=标准输入, 1=标准输出, 2=标准错误

重定向 含义
> 覆盖写入(stdout)
>> 追加写入
2> 标准错误写入
&> stdout + stderr 都写入
2>&1 stderr 合并到 stdout
< 从文件读取输入
\| 管道:前一个的 stdout → 后一个的 stdin

Here Document 适合生成多行配置或SQL:

cat << 'EOF'     # 'EOF' 不解析变量
$HOME 不会被展开
EOF

cat << EOF       # EOF 不加引号,变量会展开
$HOME 会被展开为实际路径
EOF

实战脚本

① MySQL 自动备份 — 重定向 + Here Document

#!/bin/bash
BACKUP_DIR="/backup/mysql"
DATE=$(date +%Y%m%d_%H%M%S)
mkdir -p "$BACKUP_DIR"

# 备份所有数据库
mysqldump -u root --all-databases \
    | gzip > "$BACKUP_DIR/all_${DATE}.sql.gz"

# 生成备份报告
cat << EOF > "$BACKUP_DIR/report_${DATE}.txt"
备份时间: $(date '+%Y-%m-%d %H:%M:%S')
备份文件: all_${DATE}.sql.gz
文件大小: $(ls -lh "$BACKUP_DIR/all_${DATE}.sql.gz" | awk '{print $5}')
数据库列表:
$(mysql -u root -e "SHOW DATABASES;" -s 2>/dev/null)
EOF

echo "✅ 备份完成: $BACKUP_DIR/all_${DATE}.sql.gz"

② 并发日志收集 — 管道 + xargs

#!/bin/bash
# 从多台服务器收集日志
cat servers.txt | xargs -P 10 -I {} bash -c '
    host={}
    mkdir -p "logs/$host"
    scp -o ConnectTimeout=5 "$host:/var/log/syslog" "logs/$host/syslog_$(date +%Y%m%d).log" 2>/dev/null \
        && echo "✅ $host" || echo "❌ $host"
'

10. 文本三剑客 — grep / sed / awk

原理

三个工具的定位:

工具 定位 核心能力
grep 过滤 从大量文本中筛选出匹配的行
sed 编辑 对文本进行查找替换、删除、插入
awk 处理 按列提取数据、计算、生成报表

grep 常用模式:

grep -r "error" /var/log/       # 递归搜索
grep -c "404" access.log        # 计数
grep -B2 -A2 "error" log.txt   # 显示前后2行
grep -P '\d{4}-\d{2}' log.txt  # Perl正则(更强大)

sed 核心操作:

sed 's/pattern/replacement/g'    # 替换
sed -i.bak 's/old/new/g' file   # 原地修改并备份
sed -n '10,20p' file            # 打印第10-20行
sed '/^#/d' file                # 删除注释行
sed -i '/^$/d' file             # 删除空行

awk 核心操作:

awk '{print $1, $3}' file       # 打印第1、3列
awk -F: '{print $1}' /etc/passwd  # 指定分隔符
awk 'NR>=10 && NR<=20' file    # 打印10-20行
awk '{sum+=$1} END{print sum}' # 求和
awk '{print NR, $0}' file      # 加行号

实战脚本

① Nginx 日志分析 — awk + sort + uniq

#!/bin/bash
LOG="/var/log/nginx/access.log"

echo "=== Top 10 访问 IP ==="
awk '{print $1}' "$LOG" | sort | uniq -c | sort -rn | head -10

echo ""
echo "=== 状态码分布 ==="
awk '{print $9}' "$LOG" | sort | uniq -c | sort -rn

echo ""
echo "=== 404 页面 ==="
awk '$9==404 {print $7}' "$LOG" | sort | uniq -c | sort -rn | head -10

echo ""
echo "=== 每小时请求量 ==="
awk '{print substr($4,14,2)":00"}' "$LOG" | sort | uniq -c | sort -k2

② 批量配置修改 — sed 安全替换

#!/bin/bash
# 批量修改 sshd_config
SSHD="/etc/ssh/sshd_config"

# 备份
cp "$SSHD" "${SSHD}.bak.$(date +%Y%m%d)"

# 修改配置
sed -i 's/^#Port 22$/Port 2222/' "$SSHD"
sed -i 's/^#PermitRootLogin yes$/PermitRootLogin no/' "$SSHD"
sed -i 's/^#MaxAuthTries 6$/MaxAuthTries 3/' "$SSHD"

# 验证配置
sshd -t && echo "✅ 配置有效" || { echo "❌ 配置无效,已恢复"; cp "${SSHD}.bak."* "$SSHD"; }

③ 系统资源报告 — awk 综合

#!/bin/bash
echo "=========================================="
echo "  系统资源报告 - $(date '+%Y-%m-%d %H:%M:%S')"
echo "=========================================="

echo -e "\n--- 内存使用 ---"
free -m | awk 'NR==1{print} NR==2{printf "总计: %sMB  已用: %sMB  可用: %sMB  使用率: %.1f%%\n", $2, $3, $7, $3/$2*100}'

echo -e "\n--- 磁盘使用 ---"
df -h | awk '$5+0 > 0 {printf "%-20s 使用: %s  可用: %s  使用率: %s\n", $6, $3, $4, $5}'

echo -e "\n--- CPU 核心数 ---"
nproc

echo -e "\n--- 系统负载 ---"
uptime | awk -F'load average:' '{print "1分钟: " $2 " 5分钟: " $3 " 15分钟: " $4}'

11. 正则表达式

原理

正则表达式用于匹配文本模式,grep/sed/awk/Python 都用得到:

语法 含义 示例
. 任意单个字符 a.c → abc, aXc
* 前一字符 0+ 次 ab*c → ac, abc, abbc
+ 前一字符 1+ 次 ab+c → abc, abbc
? 前一字符 0/1 次 ab?c → ac, abc
^ 行首 ^root → 以 root 开头
$ 行尾 bash$ → 以 bash 结尾
[abc] 字符集 [aeiou] → 元音
[^abc] 排除字符集 [^0-9] → 非数字
{n,m} 重复 n~m 次 \d{3} → 3位数字
() 分组 (ab)+ → ab, abab
\| cat\|dog

BRE vs ERE: BRE 用 \{n,m\},ERE 用 {n,m}。grep 加 -E 用 ERE,sed 加 -r 用 ERE。

实战脚本

① 日志中提取关键信息 — grep + 正则

#!/bin/bash
# 提取过去1小时的 5xx 错误
grep -P '^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}' access.log | \
    grep -P '\[(?!.*15:)' | \    # 这里简化:用时间过滤
    awk '$9 ~ /^5/ {print $1, $7}' | sort | uniq -c | sort -rn | head -20

② 批量验证配置文件格式 — 正则 + while

#!/bin/bash
# 检查 /etc/hosts 格式是否正确
# 正确格式: IP  主机名  [别名...]
ERRORS=0

while read -r line; do
    [[ "$line" =~ ^# ]] && continue    # 跳过注释
    [[ -z "$line" ]] && continue        # 跳过空行

    if ! echo "$line" | grep -qP '^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\s+\S+'; then
        echo "❌ 格式错误: $line"
        ((ERRORS++))
    fi
done < /etc/hosts

echo "检查完成: $ERRORS 个错误"

12. 文件测试与操作

原理

Bash 内置了丰富的文件测试运算符:

测试 含义 测试 含义
-f 是普通文件 -d 是目录
-e 存在 -L 是符号链接
-r 可读 -w 可写
-x 可执行 -s 非空
-nt 比...新 -ot 比...旧

配合 find 使用:

find /path -type f -mtime +30    # 30天前的文件
find /path -size +100M           # 大于100MB的文件
find /path -name "*.log" -exec rm {} \;  # 删除所有.log

实战脚本

① 自动备份 + 过期清理 — 文件操作综合

#!/bin/bash
SRC="/opt/data"
DST="/backup/data"
KEEP=7

mkdir -p "$DST"

# 备份(增量)
DATE=$(date +%Y%m%d)
tar czf "$DST/full_${DATE}.tar.gz" "$SRC" 2>/dev/null

# 清理过期备份
find "$DST" -name "full_*.tar.gz" -mtime +$KEEP -delete

# 报告
echo "备份大小: $(ls -lh "$DST/full_${DATE}.tar.gz" | awk '{print $5}')"
echo "保留备份数: $(ls "$DST"/full_*.tar.gz 2>/dev/null | wc -l)"

② 文件完整性检查 — md5sum + 文件测试

#!/bin/bash
# 监控关键文件是否被篡改
CHECKSUMS="/etc/file_checksums"

check_files() {
    local errors=0
    while read -r hash file; do
        if [[ ! -f "$file" ]]; then
            echo "❌ 文件丢失: $file"
            ((errors++))
        elif [[ "$(md5sum "$file" | awk '{print $1}')" != "$hash" ]]; then
            echo "⚠️  文件被修改: $file"
            ((errors++))
        fi
    done < "$CHECKSUMS"
    return $errors
}

# 生成基准校验和(首次运行)
generate_checksums() {
    for f in /etc/passwd /etc/shadow /etc/ssh/sshd_config; do
        [[ -f "$f" ]] && md5sum "$f"
    done > "$CHECKSUMS"
}

# 主逻辑
if [[ ! -f "$CHECKSUMS" ]]; then
    generate_checksums
    echo "✅ 基准校验和已生成"
else
    check_files && echo "✅ 所有文件正常" || echo "⚠️  发现异常"
fi

13. 信号处理 — trap

原理

trap 用于捕获系统信号并执行清理操作,是编写健壮脚本的关键:

信号 用途 场景
EXIT 脚本退出时 最常用,确保清理临时文件
INT Ctrl+C 用户中断
TERM kill 命令 被系统终止
HUP 终端断开 守护进程

最佳实践:EXIT 信号做清理,它会在脚本正常退出、异常退出、被 kill 时都触发。

实战脚本

① 安全的文件操作 — 临时文件 + 清理

#!/bin/bash
TMPDIR=$(mktemp -d)
trap 'rm -rf "$TMPDIR"; echo "清理完成"' EXIT

echo "临时目录: $TMPDIR"
# 在这里做文件操作...
tar xzf data.tar.gz -C "$TMPDIR"
# ...处理数据...
# 脚本退出时自动清理

② 部署脚本的回滚机制 — trap + 备份

#!/bin/bash
APP_DIR="/opt/myapp"
BACKUP="/tmp/app_backup_$(date +%s)"

# 部署前备份
cp -r "$APP_DIR" "$BACKUP"
trap 'echo "⚠️ 部署失败,回滚中..."; rm -rf "$APP_DIR"; mv "$BACKUP" "$APP_DIR"; echo "✅ 已回滚"' ERR

# 部署新版本
echo "开始部署..."
tar xzf new_version.tar.gz -C "$APP_DIR"
systemctl restart myapp

# 如果到这里没触发 trap,说明部署成功
echo "✅ 部署成功"
rm -rf "$BACKUP"
trap - ERR    # 清除 trap

14. 脚本调试

原理

Bash 调试的核心方法:

方法 用途
bash -x script.sh 执行时打印每一步
set -x 在脚本内开启调试
set -e 遇到错误立即退出
set -u 使用未定义变量时报错
set -o pipefail 管道中任一命令失败则整体失败
set -euo pipefail 严格模式,运维脚本必备

严格模式的三个参数:

  • -e:任何命令失败就停止(防止错误连锁反应)
  • -u:使用未定义变量就报错(防止拼写错误)
  • -o pipefail:管道中最后一个命令成功不代表整体成功

实战脚本

① 严格模式模板 — 运维脚本标配

#!/bin/bash
set -euo pipefail

# 你的业务逻辑
DEPLOY_DIR="/opt/app"
VERSION="${1:?用法: $0 <版本号>}"   # ${1:?} 缺参数时报错

echo "部署版本: $VERSION"
cd "$DEPLOY_DIR"
git pull origin main
git checkout "$VERSION"
npm install --production
pm2 restart app

echo "✅ 部署 $VERSION 完成"

② 调试日志函数 — 追踪脚本执行过程

#!/bin/bash
DEBUG=${DEBUG:-false}

debug() {
    [[ "$DEBUG" == "true" ]] && echo "[DEBUG $(date +%H:%M:%S)] $*" >&2
}

debug "开始执行"
debug "参数: $@"
debug "当前目录: $(pwd)"

# 你的逻辑
for host in server1 server2; do
    debug "检查 $host"
    ping -c1 -W2 "$host" &>/dev/null && debug "$host 可达" || debug "$host 不可达"
done

---

第二部分:Python 脚本


15. 基础语法 — 变量、类型、输入输出

原理

Python 是动态类型语言,变量不需要声明类型,赋值时自动推断。

核心数据类型:

类型 示例 特点
str "hello" 不可变,支持切片
int 42 任意精度,不会溢出
float 3.14 有精度限制
bool True/False 继承自 int
None None 空值,判断用 is None

f-string 是最推荐的格式化方式(Python 3.6+):

name = "Linux"
version = 6.1
print(f"{name} {version}")           # Linux 6.1
print(f"{1024*1024:,}")             # 1,048,576(千位分隔符)
print(f"{3.14159:.2f}")             # 3.14(保留2位小数)

实战脚本

① 系统信息采集 — input + print + f-string

#!/usr/bin/env python3
import platform, os, datetime

info = {
    "主机名": platform.node(),
    "系统": f"{platform.system()} {platform.release()}",
    "架构": platform.machine(),
    "Python": platform.python_version(),
    "当前用户": os.getenv("USER", "unknown"),
    "当前时间": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
}

for k, v in info.items():
    print(f"  {k}: {v}")

② 简易计算器 — 类型转换 + 条件

#!/usr/bin/env python3
import sys

def calc(a, op, b):
    ops = {
        "+": lambda: a + b,
        "-": lambda: a - b,
        "*": lambda: a * b,
        "/": lambda: a / b if b != 0 else "错误: 除零",
        "%": lambda: a % b,
    }
    return ops.get(op, lambda: "未知运算符")()

if len(sys.argv) != 4:
    print(f"用法: {sys.argv[0]} <数字> <运算符> <数字>")
    sys.exit(1)

a, op, b = float(sys.argv[1]), sys.argv[2], float(sys.argv[3])
print(f"{a} {op} {b} = {calc(a, op, b)}")

16. 字符串操作 — 方法与正则

原理

Python 字符串是不可变的,所有方法都返回新字符串:

方法 用途 方法 用途
split() 分割 join() 合并
strip() 去空白 replace() 替换
find() 查找位置 count() 计数
startswith() 前缀判断 endswith() 后缀判断
upper()/lower() 大小写 encode() 编码转换

切片语法: s[start:stop:step],和 Shell 的 ${str:0:5} 类似但更强大。

实战脚本

① 文本统计器 — 字符串方法综合

#!/usr/bin/env python3
import sys

def analyze_text(filepath):
    with open(filepath) as f:
        text = f.read()

    lines = text.splitlines()
    words = text.split()
    chars = len(text)

    print(f"文件: {filepath}")
    print(f"行数: {len(lines)}")
    print(f"词数: {len(words)}")
    print(f"字符数: {chars}")
    print(f"非空行: {sum(1 for l in lines if l.strip())}")

    # 最长行
    longest = max(lines, key=len)
    print(f"最长行: {len(longest)} 字符")

if len(sys.argv) > 1:
    analyze_text(sys.argv[1])
else:
    print("用法: text_analyzer.py <文件>")

② 批量重命名工具 — 字符串处理 + os

#!/usr/bin/env python3
import os, sys, re

def batch_rename(directory, pattern, replacement):
    """批量重命名: 支持正则"""
    count = 0
    for f in sorted(os.listdir(directory)):
        new_name = re.sub(pattern, replacement, f)
        if new_name != f:
            old_path = os.path.join(directory, f)
            new_path = os.path.join(directory, new_name)
            os.rename(old_path, new_path)
            print(f"  {f} -> {new_name}")
            count += 1
    print(f"共重命名 {count} 个文件")

if len(sys.argv) != 4:
    print("用法: rename.py <目录> <正则模式> <替换>")
    print("示例: rename.py /photos '(\\d+)' 'photo_\\1'")
    sys.exit(1)

batch_rename(sys.argv[1], sys.argv[2], sys.argv[3])

17. 列表 — 推导式与高阶操作

原理

Python 列表是有序、可变的序列,是使用最频繁的数据结构。

列表推导式是 Python 最优雅的特性之一:

# 基础
squares = [x**2 for x in range(10)]

# 带条件
evens = [x for x in range(20) if x % 2 == 0]

# 嵌套
flat = [x for row in matrix for x in row]

# 同时取值和索引
indexed = [(i, x) for i, x in enumerate(lst)]

高阶函数:

map(func, lst)        # 对每个元素应用函数
filter(func, lst)     # 筛选元素
sorted(lst, key=...)  # 自定义排序

实战脚本

① 日志分析器 — 列表推导式 + Counter

#!/usr/bin/env python3
"""分析 Nginx access.log,统计 IP、状态码、URL"""
import re
from collections import Counter

def analyze(logfile):
    ips = Counter()
    status = Counter()
    urls = Counter()
    pattern = r'(\S+) .+ "(\w+) (\S+) .+" (\d+)'

    with open(logfile) as f:
        for line in f:
            m = re.match(pattern, line)
            if m:
                ips[m.group(1)] += 1
                status[m.group(4)] += 1
                urls[m.group(3)] += 1

    print("=== Top 10 IP ===")
    for ip, count in ips.most_common(10):
        print(f"  {count:>6}  {ip}")

    print("\n=== 状态码 ===")
    for code, count in status.most_common():
        print(f"  {code}: {count}")

    print("\n=== Top 10 URL ===")
    for url, count in urls.most_common(10):
        print(f"  {count:>6}  {url}")

if __name__ == "__main__":
    import sys
    analyze(sys.argv[1] if len(sys.argv) > 1 else "/var/log/nginx/access.log")

② CSV 数据分析 — 列表 + 字典

#!/usr/bin/env python3
"""分析 CSV 数据,计算统计指标"""
import csv, sys

def analyze_csv(filepath, value_column):
    values = []
    with open(filepath) as f:
        reader = csv.DictReader(f)
        for row in reader:
            try:
                values.append(float(row[value_column]))
            except (ValueError, KeyError):
                continue

    if not values:
        print("无有效数据"); return

    print(f"列: {value_column}")
    print(f"  数量: {len(values)}")
    print(f"  总和: {sum(values):,.2f}")
    print(f"  平均: {sum(values)/len(values):,.2f}")
    print(f"  最大: {max(values):,.2f}")
    print(f"  最小: {min(values):,.2f}")

    # 中位数
    sorted_vals = sorted(values)
    mid = len(sorted_vals) // 2
    median = sorted_vals[mid] if len(sorted_vals) % 2 else (sorted_vals[mid-1] + sorted_vals[mid]) / 2
    print(f"  中位数: {median:,.2f}")

if __name__ == "__main__":
    if len(sys.argv) < 3:
        print("用法: csv_stats.py <文件> <列名>"); sys.exit(1)
    analyze_csv(sys.argv[1], sys.argv[2])

18. 元组与集合

原理

元组(tuple): 不可变列表,适合存储不应修改的数据。可做字典的键(列表不行)。

集合(set): 无序、不重复,天然去重。支持数学集合运算(交、并、差、对称差)。

操作 语法 说明
交集 a & ba.intersection(b) 两个都有的
并集 a \| ba.union(b) 至少一个有
差集 a - ba.difference(b) a 有 b 没有
对称差集 a ^ b 只在其中一个

实战脚本

① 文件差异比较 — 集合运算

#!/usr/bin/env python3
"""比较两个目录的文件差异"""
import os, sys
from pathlib import Path

def diff_dirs(dir1, dir2):
    files1 = {f.name for f in Path(dir1).rglob("*") if f.is_file()}
    files2 = {f.name for f in Path(dir2).rglob("*") if f.is_file()}

    only1 = files1 - files2
    only2 = files2 - files1
    both = files1 & files2

    print(f"仅在 {dir1}: {len(only1)} 个")
    for f in sorted(only1)[:10]: print(f"  - {f}")

    print(f"\n仅在 {dir2}: {len(only2)} 个")
    for f in sorted(only2)[:10]: print(f"  + {f}")

    print(f"\n两边都有: {len(both)} 个")

if len(sys.argv) != 3:
    print("用法: diff_dirs.py <目录1> <目录2>"); sys.exit(1)
diff_dirs(sys.argv[1], sys.argv[2])

② 数据去重保序 — 集合 + 列表

#!/usr/bin/env python3
"""读取文件去重,保持原始顺序"""
def deduplicate(filepath):
    seen = set()
    result = []
    with open(filepath) as f:
        for line in f:
            key = line.strip()
            if key and key not in seen:
                seen.add(key)
                result.append(line)

    with open(filepath, 'w') as f:
        f.writelines(result)
    print(f"去重完成: {len(seen)} 行唯一")

# deduplicate("/etc/hosts")

19. 字典 — 数据组织与处理

原理

字典是 Python 最灵活的数据结构,本质是哈希表,查找速度 O(1)。

字典推导式:

squares = {x: x**2 for x in range(6)}
# {0: 0, 1: 1, 2: 4, 3: 9, 4: 16, 5: 25}

Counter 是字典的子类,专门用于计数:

from collections import Counter
words = Counter("hello world".split())
words.most_common(2)   # [('world', 1), ('hello', 1)]

实战脚本

① 端口扫描器 — 字典 + socket

#!/usr/bin/env python3
"""简易端口扫描"""
import socket

def scan_ports(host, ports):
    results = {}
    for port in ports:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        results[port] = sock.connect_ex((host, port)) == 0
        sock.close()
    return results

common_ports = {22: "SSH", 80: "HTTP", 443: "HTTPS", 3306: "MySQL", 6379: "Redis", 8080: "HTTP-Alt"}

host = "127.0.0.1"
print(f"扫描 {host}...")
results = scan_ports(host, common_ports.keys())

for port, is_open in results.items():
    status = "✅ 开放" if is_open else "❌ 关闭"
    print(f"  {port:>5} ({common_ports[port]:>10}): {status}")

② 配置文件解析器 — 字典 + 文件操作

#!/usr/bin/env python3
"""解析 ini 格式配置文件"""
import re

def parse_ini(filepath):
    config = {}
    current_section = None

    with open(filepath) as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith('#'):
                continue

            section_match = re.match(r'\[(.+)\]', line)
            if section_match:
                current_section = section_match.group(1)
                config[current_section] = {}
                continue

            if '=' in line and current_section:
                key, value = line.split('=', 1)
                config[current_section][key.strip()] = value.strip()

    return config

# config = parse_ini("/etc/myapp/config.ini")
# print(config["database"]["host"])

20. 条件与循环 — 控制流

原理

Python 的条件判断比 Shell 更直观:

# 三目运算符
result = "通过" if score >= 60 else "不通过"

# match-case(Python 3.10+)
match command:
    case "start": start_app()
    case "stop": stop_app()
    case _: print("未知命令")

循环的高级用法:

# enumerate 同时取索引和值
for i, item in enumerate(lst):
    print(f"{i}: {item}")

# zip 并行遍历
for name, score in zip(names, scores):
    print(f"{name}: {score}")

# 列表推导式替代循环
squares = [x**2 for x in range(10)]

实战脚本

① 批量 SSH 命令执行 — 循环 + subprocess

#!/usr/bin/env python3
"""批量在多台服务器执行命令"""
import subprocess, sys, concurrent.futures

def run_on_host(host, cmd):
    try:
        r = subprocess.run(
            ["ssh", "-o", "ConnectTimeout=5", "-o", "StrictHostKeyChecking=no",
             host, cmd],
            capture_output=True, text=True, timeout=30
        )
        return host, r.returncode, r.stdout.strip()
    except subprocess.TimeoutExpired:
        return host, -1, "超时"

def main():
    if len(sys.argv) < 3:
        print("用法: batch_ssh.py <hosts.txt> <命令>"); sys.exit(1)

    with open(sys.argv[1]) as f:
        hosts = [line.strip() for line in f if line.strip()]

    cmd = " ".join(sys.argv[2:])

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = {executor.submit(run_on_host, h, cmd): h for h in hosts}
        for future in concurrent.futures.as_completed(futures):
            host, code, output = future.result()
            status = "✅" if code == 0 else "❌"
            print(f"{status} {host}: {output[:100]}")

if __name__ == "__main__":
    main()

21. 函数与模块 — 代码复用

原理

函数是代码复用的基本单元:

def func(必需参数, 默认参数=None, *args, **kwargs):
    """文档字符串"""
    return 结果
参数类型 语法 用途
位置参数 f(a, b) 必须传
默认参数 f(a, b=10) 可选
可变位置 f(*args) 接收任意个位置参数
可变关键字 f(**kwargs) 接收任意个关键字参数

装饰器是 Python 的高级特性,用于在不修改函数代码的情况下增加功能:

import time
def timer(func):
    def wrapper(*a, **kw):
        t = time.time()
        result = func(*a, **kw)
        print(f"{func.__name__} 耗时 {time.time()-t:.3f}s")
        return result
    return wrapper

实战脚本

① 缓存装饰器 — 避免重复计算

#!/usr/bin/env python3
"""简易缓存装饰器"""
import time, functools

def cache(ttl=60):
    """带过期时间的缓存"""
    def decorator(func):
        _cache = {}
        @functools.wraps(func)
        def wrapper(*args):
            now = time.time()
            if args in _cache and now - _cache[args]['time'] < ttl:
                return _cache[args]['value']
            result = func(*args)
            _cache[args] = {'value': result, 'time': now}
            return result
        return wrapper
    return decorator

@cache(ttl=30)
def expensive_query(user_id):
    """模拟耗时查询"""
    time.sleep(1)
    return f"用户{user_id}的数据"

# 第一次调用: 耗时1秒
# 30秒内再调用: 瞬间返回(命中缓存)

② 命令行工具框架 — argparse + 函数

#!/usr/bin/env python3
"""通用命令行工具模板"""
import argparse, sys

def cmd_info(args):
    """显示系统信息"""
    import platform
    print(f"系统: {platform.system()} {platform.release()}")
    print(f"Python: {platform.python_version()}")

def cmd_check(args):
    """检查服务状态"""
    import subprocess
    for host in args.hosts:
        r = subprocess.run(["ping", "-c1", "-W2", host], capture_output=True)
        status = "✅" if r.returncode == 0 else "❌"
        print(f"{status} {host}")

def main():
    parser = argparse.ArgumentParser(description="运维工具")
    subparsers = parser.add_subparsers(dest="command", help="可用命令")

    subparsers.add_parser("info", help="系统信息")

    check_parser = subparsers.add_parser("check", help="检查主机")
    check_parser.add_argument("hosts", nargs="+", help="主机列表")

    args = parser.parse_args()

    commands = {"info": cmd_info, "check": cmd_check}
    if args.command in commands:
        commands[args.command](args)
    else:
        parser.print_help()

if __name__ == "__main__":
    main()

22. 文件操作 — 读写与路径

原理

with 语句是 Python 文件操作的标准方式,确保文件正确关闭:

# 读取
with open("file.txt") as f:
    content = f.read()         # 全部读入
    lines = f.readlines()      # 行列表
    for line in f:             # 逐行(内存友好)
        process(line)

# 写入
with open("out.txt", "w") as f:
    f.write("hello\n")

# 追加
with open("log.txt", "a") as f:
    f.write("新日志\n")

# 二进制
with open("image.jpg", "rb") as f:
    data = f.read()

pathlib 是现代路径操作的推荐方式(Python 3.4+):

from pathlib import Path
p = Path("/etc/passwd")
p.exists()           # True
p.name               # "passwd"
p.stem               # "passwd"(无扩展名)
p.suffix             # ""(无扩展名)
p.parent             # Path("/etc")
p.read_text()        # 读取内容
list(Path("/etc").glob("*.conf"))  # 列出所有.conf文件

实战脚本

① 日志分析器 — 文件操作 + 正则 + Counter

#!/usr/bin/env python3
"""Nginx 日志深度分析"""
import re, sys
from collections import Counter
from pathlib import Path

def analyze(logfile):
    pattern = re.compile(
        r'(?P<ip>\S+) .+ \[(?P<time>[^\]]+)\] '
        r'"(?P<method>\w+) (?P<path>\S+) .+" '
        r'(?P<status>\d+) (?P<size>\d+)'
    )

    data = {'ips': Counter(), 'paths': Counter(), 'status': Counter(), 'total_size': 0}

    with open(logfile) as f:
        for line in f:
            m = pattern.match(line)
            if m:
                data['ips'][m.group('ip')] += 1
                data['paths'][m.group('path')] += 1
                data['status'][m.group('status')] += 1
                data['total_size'] += int(m.group('size'))

    print(f"总请求数: {sum(data['ips'].values()):,}")
    print(f"总流量: {data['total_size']/1024/1024:.2f} MB")
    print(f"唯一IP: {len(data['ips']):,}")

    print("\nTop 10 IP:")
    for ip, count in data['ips'].most_common(10):
        print(f"  {count:>8}  {ip}")

    print("\nTop 10 页面:")
    for path, count in data['paths'].most_common(10):
        print(f"  {count:>8}  {path}")

if __name__ == "__main__":
    analyze(sys.argv[1] if len(sys.argv) > 1 else "/var/log/nginx/access.log")

23. 异常处理 — 健壮性保障

原理

Python 用 try/except/else/finally 处理异常:

try:
    result = risky_operation()
except SpecificError as e:
    handle(e)
except Exception as e:
    handle_unexpected(e)
else:
    # 没有异常时执行
    process(result)
finally:
    # 无论如何都执行
    cleanup()

异常层次:

Exception
├── ValueError      # 值错误(int("abc"))
├── TypeError       # 类型错误
├── KeyError        # 字典键不存在
├── FileNotFoundError  # 文件不存在
├── PermissionError    # 权限不足
├── ConnectionError    # 网络连接错误
└── ...

实战脚本

① 重试机制装饰器 — 网络请求必备

#!/usr/bin/env python3
"""带指数退避的重试装饰器"""
import time, functools, random

def retry(max_attempts=3, base_delay=1, exceptions=(Exception,)):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_attempts:
                        raise
                    delay = base_delay * (2 ** (attempt - 1)) + random.uniform(0, 1)
                    print(f"第{attempt}次失败: {e}, {delay:.1f}s后重试")
                    time.sleep(delay)
        return wrapper
    return decorator

@retry(max_attempts=3, exceptions=(ConnectionError, TimeoutError))
def fetch_data(url):
    import urllib.request
    with urllib.request.urlopen(url, timeout=5) as r:
        return r.read()

# fetch_data("https://api.example.com/data")

② 安全的配置加载器 — 多层异常处理

#!/usr/bin/env python3
"""安全加载配置文件,支持多级 fallback"""
import json, os

def load_config(*paths):
    for path in paths:
        try:
            with open(path) as f:
                config = json.load(f)
                print(f"✅ 加载配置: {path}")
                return config
        except FileNotFoundError:
            print(f"⚠️  配置不存在: {path}")
        except json.JSONDecodeError as e:
            print(f"❌ 配置格式错误: {path} - {e}")
        except PermissionError:
            print(f"🔒 无权限读取: {path}")

    print("⚠️  所有配置文件均不可用,使用默认配置")
    return {"host": "localhost", "port": 8080, "debug": False}

config = load_config(
    "/etc/myapp/config.json",
    os.path.expanduser("~/.config/myapp/config.json"),
    "config.json"
)
print(f"最终配置: {config}")

24. os 与 subprocess — 系统交互

原理

os 模块提供操作系统级功能:

os.getcwd()              # 当前目录
os.listdir("/etc")       # 列目录
os.makedirs("a/b/c")    # 递归创建
os.path.exists("/tmp")   # True
os.path.join("a", "b")  # a/b(跨平台)
os.environ["HOME"]       # 环境变量

subprocess 模块用于执行系统命令:

import subprocess

# 推荐方式
r = subprocess.run(["ls", "-la"], capture_output=True, text=True)
print(r.stdout, r.returncode)

# shell 管道(慎用,有安全风险)
r = subprocess.run("ps aux | grep python", shell=True, capture_output=True, text=True)

pathlib 是现代路径操作的推荐方式:

from pathlib import Path
p = Path("/etc/passwd")
p.read_text()           # 读取
for f in Path("/etc").glob("*.conf"): print(f.name)

实战脚本

① 系统监控面板 — 多模块综合

#!/usr/bin/env python3
"""简易系统监控"""
import os, subprocess, time
from datetime import datetime
from pathlib import Path

def get_cpu_usage():
    with open("/proc/stat") as f:
        fields = f.readline().split()[1:]
    total = sum(int(x) for x in fields)
    idle = int(fields[3])
    return total, idle

def get_memory():
    with open("/proc/meminfo") as f:
        info = {}
        for line in f:
            parts = line.split()
            info[parts[0].rstrip(":")] = int(parts[1])
    total = info["MemTotal"] / 1024 / 1024
    available = info["MemAvailable"] / 1024 / 1024
    used = total - available
    return total, used, used / total * 100

def get_disk():
    result = subprocess.run(["df", "-h", "/"], capture_output=True, text=True)
    line = result.stdout.strip().split("\n")[1].split()
    return {"mount": line[5], "size": line[1], "used": line[2], "avail": line[3], "pct": line[4]}

def monitor():
    t1_total, t1_idle = get_cpu_usage()
    time.sleep(1)
    t2_total, t2_idle = get_cpu_usage()

    cpu = 100 * (1 - (t2_idle - t1_idle) / (t2_total - t1_total))
    mem_total, mem_used, mem_pct = get_memory()
    disk = get_disk()

    print(f"\r[{datetime.now():%H:%M:%S}] "
          f"CPU: {cpu:5.1f}% | "
          f"内存: {mem_used:.1f}/{mem_total:.1f}GB ({mem_pct:.0f}%) | "
          f"磁盘: {disk['used']}/{disk['size']} ({disk['pct']})",
          end="", flush=True)

if __name__ == "__main__":
    while True:
        monitor()
        time.sleep(5)

25. 正则表达式 — re 模块

原理

Python 的 re 模块提供了完整的正则表达式支持:

import re
re.search(r'\d+', text)      # 第一个匹配
re.findall(r'\d+', text)     # 所有匹配
re.sub(r'\d+', 'X', text)    # 替换
re.split(r'\s+', text)       # 分割
re.match(r'^\d+$', text)     # 从开头匹配

常用模式:

模式 含义 示例
\d 数字 \d+ 匹配 123
\w 字母/数字/下划线 \w+ 匹配 hello
\s 空白字符 \s+ 匹配空格
.* 贪婪匹配 尽可能多匹配
.*? 非贪婪匹配 尽可能少匹配
(?P<name>...) 命名分组 提取命名数据

实战脚本

① IP 地址提取与验证

#!/usr/bin/env python3
"""从文本中提取并验证 IP 地址"""
import re

def extract_ips(text):
    """提取所有 IPv4 地址"""
    pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}\b'
    return re.findall(pattern, text)

def validate_ip(ip):
    """验证 IP 地址有效性"""
    parts = ip.split('.')
    return len(parts) == 4 and all(0 <= int(p) <= 255 for p in parts)

# 使用示例
text = """
连接 192.168.1.100 成功
备用 10.0.0.55
无效 999.1.1.1
"""
ips = extract_ips(text)
for ip in ips:
    status = "✅ 有效" if validate_ip(ip) else "❌ 无效"
    print(f"  {ip}: {status}")

26. JSON 与 CSV — 数据处理

原理

JSON 是 API 和配置文件的标准格式:

import json
data = json.load(open("config.json"))           # 读文件
json.dump(data, open("out.json", "w"), indent=2)  # 写文件
s = json.dumps(data, indent=2, ensure_ascii=False)  # 转字符串
d = json.loads(s)                                # 解析字符串

CSV 是表格数据的标准格式:

import csv
for row in csv.DictReader(open("data.csv")):
    print(row["name"], row["score"])

实战脚本

① 日志转 JSON — 结构化日志

#!/usr/bin/env python3
"""将 Nginx 日志转为 JSON 格式,便于后续分析"""
import re, json, sys

def nginx_log_to_json(logfile):
    pattern = re.compile(
        r'(?P<ip>\S+) - (?P<user>\S+) \[(?P<time>[^\]]+)\] '
        r'"(?P<method>\w+) (?P<path>\S+) (?P<protocol>[^"]+)" '
        r'(?P<status>\d+) (?P<size>\d+)'
    )

    for line in logfile:
        m = pattern.match(line)
        if m:
            record = m.groupdict()
            record['size'] = int(record['size'])
            record['status'] = int(record['status'])
            print(json.dumps(record, ensure_ascii=False))

if __name__ == "__main__":
    with open(sys.argv[1] if len(sys.argv) > 1 else "/var/log/nginx/access.log") as f:
        nginx_log_to_json(f)

27-30. 综合实战项目

27. 自动化部署脚本

#!/usr/bin/env python3
"""一键部署应用到远程服务器"""
import subprocess, sys, json, os
from datetime import datetime

class Deployer:
    def __init__(self, config_file):
        with open(config_file) as f:
            self.config = json.load(f)
        self.log_file = f"/var/log/deploy_{datetime.now():%Y%m%d}.log"

    def log(self, msg):
        ts = datetime.now().strftime("%H:%M:%S")
        line = f"[{ts}] {msg}"
        print(line)
        with open(self.log_file, "a") as f:
            f.write(line + "\n")

    def run_remote(self, host, cmd):
        r = subprocess.run(
            ["ssh", "-o", "ConnectTimeout=10", host, cmd],
            capture_output=True, text=True, timeout=60
        )
        return r.returncode, r.stdout.strip()

    def deploy(self):
        host = self.config["host"]
        app_dir = self.config["app_dir"]

        self.log(f"部署到 {host}:{app_dir}")

        # 备份
        self.log("备份当前版本...")
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        self.run_remote(host, f"cp -r {app_dir} {app_dir}.bak.{ts}")

        # 部署
        self.log("传输文件...")
        subprocess.run(["rsync", "-avz", "dist/", f"{host}:{app_dir}/"], check=True)

        # 重启
        self.log("重启服务...")
        code, out = self.run_remote(host, self.config.get("restart_cmd", "systemctl restart app"))
        if code != 0:
            self.log(f"❌ 重启失败: {out}")
            self.log("回滚中...")
            self.run_remote(host, f"rm -rf {app_dir} && mv {app_dir}.bak.{ts} {app_dir}")
            self.run_remote(host, "systemctl restart app")
            self.log("✅ 已回滚")
            return False

        self.log("✅ 部署成功")
        return True

if __name__ == "__main__":
    config_file = sys.argv[1] if len(sys.argv) > 1 else "deploy.json"
    Deployer(config_file).deploy()

28. 日志告警系统

#!/usr/bin/env python3
"""实时日志监控 + 告警"""
import time, hashlib, json
from pathlib import Path
from datetime import datetime

ALERT_COOLDOWN = 300  # 5分钟内同一条告警不重复
ALERT_FILE = Path("/tmp/alert_state.json")

def load_state():
    if ALERT_FILE.exists():
        return json.loads(ALERT_FILE.read_text())
    return {}

def save_state(state):
    ALERT_FILE.write_text(json.dumps(state))

def should_alert(msg, state):
    key = hashlib.md5(msg.encode()).hexdigest()
    now = time.time()
    if key in state and now - state[key] < ALERT_COOLDOWN:
        return False
    state[key] = now
    return True

def monitor_log(logfile, patterns=None):
    if patterns is None:
        patterns = ["error", "exception", "fatal", "oom", "timeout"]

    state = load_state()

    with open(logfile) as f:
        f.seek(0, 2)  # 跳到文件末尾
        while True:
            line = f.readline()
            if not line:
                time.sleep(0.5)
                continue

            line_lower = line.lower()
            for pattern in patterns:
                if pattern in line_lower:
                    msg = line.strip()
                    if should_alert(msg, state):
                        save_state(state)
                        alert = f"[{datetime.now():%Y-%m-%d %H:%M:%S}] ⚠️  {msg}"
                        print(alert)
                        # 这里可以发送到钉钉/飞书/邮件
                    break

if __name__ == "__main__":
    import sys
    logfile = sys.argv[1] if len(sys.argv) > 1 else "/var/log/syslog"
    print(f"监控 {logfile}...")
    monitor_log(logfile)

29. 数据库健康检查

#!/usr/bin/env python3
"""MySQL/PostgreSQL 健康检查"""
import subprocess, json, sys
from datetime import datetime

def check_mysql(host="localhost", port=3306):
    """检查 MySQL 连接和基本指标"""
    result = {}
    try:
        r = subprocess.run(
            ["mysqladmin", "-h", host, "-P", str(port), "ping"],
            capture_output=True, text=True, timeout=5
        )
        result["ping"] = r.returncode == 0

        if result["ping"]:
            # 获取连接数
            r = mysql_query(host, port,
                "SHOW STATUS WHERE Variable_name='Threads_connected'")
            for line in r.split("\n"):
                if "Threads_connected" in line:
                    result["connections"] = int(line.split()[-1])

            # 获取运行时间
            r = mysql_query(host, port,
                "SHOW STATUS WHERE Variable_name='Uptime'")
            for line in r.split("\n"):
                if "Uptime" in line:
                    result["uptime_hours"] = int(line.split()[-1]) // 3600

    except Exception as e:
        result["error"] = str(e)

    return result

def mysql_query(host, port, query):
    r = subprocess.run(
        ["mysql", "-h", host, "-P", str(port), "-N", "-e", query],
        capture_output=True, text=True, timeout=5
    )
    return r.stdout

def check_disk():
    r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True)
    line = r.stdout.strip().split("\n")[1].split()
    return {"mount": line[5], "usage": line[4], "available": line[3]}

if __name__ == "__main__":
    print(f"=== 健康检查 {datetime.now():%Y-%m-%d %H:%M:%S} ===")

    mysql = check_mysql()
    print(f"MySQL: {'✅' if mysql.get('ping') else '❌'}", end="")
    if "connections" in mysql:
        print(f" (连接数: {mysql['connections']}, 运行: {mysql.get('uptime_hours', '?')}h)")
    else:
        print()

    disk = check_disk()
    print(f"磁盘 /: {disk['usage']} 使用, {disk['available']} 可用")

30. 自动化报告生成

#!/usr/bin/env python3
"""生成系统状态 HTML 报告"""
import subprocess, os, platform
from datetime import datetime
from pathlib import Path

def get_system_info():
    """采集系统信息"""
    info = {}

    # CPU
    with open("/proc/loadavg") as f:
        info["load"] = f.read().split()[:3]

    # 内存
    with open("/proc/meminfo") as f:
        mem = {}
        for line in f:
            parts = line.split()
            mem[parts[0].rstrip(":")] = int(parts[1])
    info["mem_total"] = mem["MemTotal"] // 1024
    info["mem_avail"] = mem["MemAvailable"] // 1024

    # 磁盘
    r = subprocess.run(["df", "-h", "/"], capture_output=True, text=True)
    parts = r.stdout.strip().split("\n")[1].split()
    info["disk_size"] = parts[1]
    info["disk_used"] = parts[2]
    info["disk_pct"] = parts[4]

    return info

def generate_report(info):
    """生成 HTML 报告"""
    now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    mem_pct = (info["mem_total"] - info["mem_avail"]) / info["mem_total"] * 100

    html = f"""<!DOCTYPE html>
<html><head><meta charset="utf-8"><title>系统报告</title>
<style>
body {{ font-family: -apple-system, sans-serif; max-width: 800px; margin: 40px auto; padding: 20px; }}
.metric {{ background: #f5f5f5; padding: 15px; margin: 10px 0; border-radius: 8px; }}
.warning {{ background: #fff3cd; border-left: 4px solid #ffc107; }}
.critical {{ background: #f8d7da; border-left: 4px solid #dc3545; }}
h1 {{ color: #333; }}
</style></head><body>
<h1>📊 系统状态报告</h1>
<p>生成时间: {now}</p>
<p>主机名: {platform.node()}</p>
<div class="metric"><strong>CPU 负载:</strong> {', '.join(info['load'])}</div>
<div class="metric{'warning' if mem_pct > 80 else ''}">
  <strong>内存:</strong> {info['mem_total']}MB 总量, {info['mem_avail']}MB 可用 ({mem_pct:.1f}% 使用)
</div>
<div class="metric{'critical' if int(info['disk_pct'].rstrip('%')) > 90 else ''}">
  <strong>磁盘:</strong> {info['disk_used']}/{info['disk_size']} ({info['disk_pct']})
</div>
</body></html>"""
    return html

if __name__ == "__main__":
    info = get_system_info()
    html = generate_report(info)

    output = Path("report.html")
    output.write_text(html)
    print(f"✅ 报告已生成: {output.absolute()}")

🎯 学习路线

内容 实战目标
第一周 Shell 基础(1-6) 写服务器巡检脚本
第二周 Shell 进阶(7-14) 写服务管理脚本
第三周 Python 基础(15-21) 写日志分析工具
第四周 Python 进阶(22-30) 写自动化部署工具

学习方法

  1. 先理解原理,再看示例
  2. 改示例,改成自己的场景
  3. 解决实际问题,工作中遇到的重复任务写成脚本
  4. 积累代码库,把写过的脚本存好复用