0
点赞
收藏
分享

微信扫一扫

c#设计模式-行为型模式 之 模板方法模式

大雁f 2023-10-04 阅读 42

主函数:

# _*_ Coding : UTF-8 _*_
# @Time :  13:14
# @Author : YYZ
# @File : Flask
# @Project : Python_Project_爬虫
import json

from flask import Flask,request,jsonify
import ssh

api = Flask(__name__)

# methods: 指定请求方式


'''
接口解析参数
        host = host_info["host"]
        port = host_info["port"]
        service = host_info["service"]
        user = host_info["user"]
        pwd = host_info["pwd"]
'''
@api.route('/',methods=['POST'])
def install():
    # 请求方式为post时,可以使用 request.get_json()接收到JSON数据
    try:
        #host_info = request.get_json()  # 获取 POST 请求中的 JSON 数据
        host_info = request.get_data()
        # 如果得到的data是字符串格式,则需要用json.loads来变换成python格式,看个人需求
        host_info = json.loads(host_info)
        print(host_info)

    except Exception as e:
        return jsonify({'error': '请求数据失败'}), 400
    # 处理数据
    # 调用do_something_with_data函数来处理接收到的数据。
    try:
        connect = ssh.Sshclass(host_info["host"], host_info["user"], host_info["port"])  # 端口,用户,ssh端口
        connect.conn_by_pwd(host_info["pwd"])  # 输入密码,进行登录
    except Exception as e:
        return jsonify({'error': '连接失败'}), 888

    if  host_info["cmd"] :
        try:
            command_res = str(connect.exec_commond(host_info["cmd"]))    #执行命令
            print(command_res)
        except Exception as e:
            return jsonify({'error': '执行失败'}), 888

    if host_info["file-determine"] == "yes":
        try:
            file_res = connect.upload_file(host_info["local_path"],host_info["remote_path"],host_info["file_name"])
            print(host_info["file_name"]+str(file_res))
        except Exception as e:
            return jsonify({'error': '上传失败'}), 888
    return  "操作完成"


if __name__ == '__main__':
    api.run(host='0.0.0.0', port=8080, debug=True)

Ssh连接部分:

import paramiko
import json
'''
ssh 连接对象
本对象提供密钥连接、命令执行、关闭连接
'''

class Sshclass(object):

    # ip = ''
    # port = 22
    # username = ''
    # timeout = 0
    # ssh = None

    def __init__(self,ip,username,port=22,timeout=30):
        '''
        初始化ssh对象
        :param ip: 主机IP
        :param username: 登录用户名
        :param port: ssh端口号
        :param timeout: 连接超时
        :return:
        '''

        self.ip = ip
        self.username = username
        self.timeout = timeout
        self.port = port

        '''
         SSHClient作用类似于Linux的ssh命令,是对SSH会话的封装,该类封装了传输(Transport),通道(Channel)及SFTPClient建立的方法(open_sftp),通常用于执行远程命令。
         Paramiko中的几个基础名词:
         1、Channel:是一种类Socket,一种安全的SSH传输通道;
         2、Transport:是一种加密的会话,使用时会同步创建了一个加密的Tunnels(通道),这个Tunnels叫做Channel;
         3、Session:是client与Server保持连接的对象,用connect()/start_client()/start_server()开始会话。
        '''
        ssh = paramiko.SSHClient()
        #远程主机没有本地主机密钥或HostKeys对象时的连接方法,需要配置
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh = ssh

    def conn_by_key(self,key):
        '''
        密钥连接
        :param key:  str rsa密钥路径
        :return:  ssh连接对象
        '''
        rsa_key = paramiko.RSAKey.from_private_key(key)
        self.ssh.connect(hostname=self.ip,port=self.port,username=self.username,pkey=rsa_key,timeout=self.timeout)
        if self.ssh:
            print('密钥连接成功')
        else:
            self.close()
            raise Exception('密钥连接失败')

    def conn_by_pwd(self,pwd):
        '''
        密码连接
        :param pwd: 登录密码
        :return:  ssh连接对象
        '''
        self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        self.ssh.connect(hostname=self.ip,port=self.port,username=self.username,password=pwd)
        if self.ssh:
            print('密码连接成功')

        else:
            self.close()
            raise Exception('密码连接失败')

    def exec_commond(self,command):
        '''
        命令控制
        :param commond: 命令
        :return: 返回结构
        '''

        if command:
            stdin, stdout, stderr = self.ssh.exec_command(command)
            return {
                "stdin":command,
                "stdout":stdout.read(),
                "stderr":stderr.read()
            }

        else:
            self.close()
            raise Exception("命令不能为空")

    def close(self):
        '''
        关闭当前连接
        :return:
        '''
        if self.ssh:
            self.ssh.close()
        else:
            raise Exception("ssh关闭连接失败,当前对象没有ssh连接。")

    def upload_file(self,local_path,remote_path,file_name):
        # sftp_link = paramiko.Transport(self.ip,self.port)
        # sftp_link.connect(username=self.username,password=pwd)
        # sftp = paramiko.SFTPClient.from_transport(sftp_link)
        sftp = self.ssh.open_sftp()
        try:
            sftp.put(local_path+"\\"+file_name, remote_path+"/"+file_name)
            #print("上传成功")
            return ("上传成功"), 200
        except Exception as e:
            return {'error': '上传失败'}, 888
        finally:
            sftp.close()
            self.close()

if __name__ == '__main__':
    ssh = Sshclass('192.168.115.23','root', port=22)
    pwd = "123456"
    local_path = 'D:\PyChrom\Python_Flask\自动化接口--Flask'
    remote_path = "/opt"
    file_name = 'file.py'
    ssh.conn_by_pwd(pwd)
    #res = str(ssh.exec_commond("ls /"))
    #print(res)
    res = ssh.upload_file(local_path,remote_path,file_name)
    print(res)

接口调试

后续优化思路:

目前只是远程连接+文件上传,后续会继续优化

弄个公共的nfs,平常一些脚本和包会放到这个nfs里,脚本或包自动从nfs里拉,然后执行脚本,即可部署,包括多机部署。

举报

相关推荐

0 条评论