Paramiko


什么是 Paramiko?

Paramiko 是一个 Python 实现的 SSH 协议库,提供了 SSH 客户端和 SSH 服务器的 API。它允许你通过 SSH 协议远程控制服务器,进行数据传输或在 Shell 中执行命令等操作。

如何安装 Paramiko?

Paramiko 可以使用 pip 安装,命令如下:

pip install paramiko

如何使用 Paramiko 连接 SSH 服务器? 使用 Paramiko 连接 SSH 服务器可以通过如下代码实现:

import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Execute command
command = 'your_ssh_command'
stdin, stdout, stderr = ssh.exec_command(command)
# Print output
print(stdout.read().decode())
# Close SSH connection
ssh.close()

如何使用 Paramiko 上传和下载文件? 可以使用 Paramiko 的 SFTP API 上传和下载文件:

import paramiko
# SFTP credentials
sftp_host = 'your_sftp_host'
sftp_user = 'your_sftp_username'
sftp_password = 'your_sftp_password'
# Establish SFTP connection
transport = paramiko.Transport((sftp_host, 22))
transport.connect(username=sftp_user, password=sftp_password)
sftp = transport.open_sftp()
# Download a remote file
remote_file_path = '/path/to/remote/file'
local_file_path = '/path/to/local/file'
sftp.get(remote_file_path, local_file_path)
# Upload a local file
local_file_path = '/path/to/local/file'
remote_file_path = '/path/to/remote/file'
sftp.put(local_file_path, remote_file_path)
# Close SFTP connection
sftp.close()
transport.close()

如何使用 Paramiko 执行 sudo 命令? 可以使用 Paramiko 的 invoke_shell() 方法来模拟一个终端会话,然后执行 sudo 命令:

import paramiko
# SSH credentials
ssh_host = 'your_ssh_host'
ssh_user = 'your_ssh_username'
ssh_password = 'your_ssh_password'
# Establish SSH connection
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(ssh_host, username=ssh_user, password=ssh_password)
# Start a shell session
shell = ssh.invoke_shell()
shell.send('sudo your_command\n')
# Wait for the password prompt
while not shell.recv_ready():
    pass
shell.send('your_password\n')
# Execute command
output = shell.recv(1024)
# Print output
print(output.decode())
 # Close SSH connection
ssh.close()

** 增加异常处理 **

import paramiko
import json


# SSH连接信息
hostname = "1.1.1.1"
username = "root"
password = "123456"
# 命令
command = "kubectl get node -o json"
# 创建SSH客户端
client = paramiko.SSHClient()
# 自动添加主机密钥
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())


# 连接SSH服务器
try:
    client.connect(hostname, username=username, password=password)
except paramiko.AuthenticationException:
    print("Authentication failed, please verify your password.")
except paramiko.SSHException as sshException:
    print("Unable to establish SSH connection: %s" % sshException)
except paramiko.SSHException as e:
    print(e)


# 执行命令
try:
    stdin, stdout, stderr = client.exec_command(command, timeout=10)
    output = stdout.read().decode()
    print(output)
    result = json.loads(output)
    print(result["kind"])
except paramiko.SSHException as sshException:
    print("Unable to execute command: %s" % sshException)

# 关闭连接
client.close()