python运维开发(十三)SQLalchemy和paramiko续

内容目录:

ORM架构SQLalchemy

Paramiko

SQLalchemy对表的操作

使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

1、创建表

# 单表
class Test(Base):
    __tablename__ = 'test'
    nid = Column(Integer, primary_key=True,autoincrement=True)
    name = Column(String(32))
# 一对多
class Group(Base):
    __tablename__ = 'group'
    nid = Column(Integer, primary_key=True,autoincrement=True)
    caption = Column(String(32))

class User(Base):
    __tablename__ = 'user'
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))
    group_id = Column(Integer, ForeignKey('group.nid'))
    group = relationship("Group", backref='uuu')

    def __repr__(self):
        temp = "%s - %s: %s" %(self.nid, self.username, self.group_id)
        return temp
#多对多
class Host(Base): # metaclass,Host.table对象
    __tablename__ = 'host'
    nid = Column(Integer, primary_key=True,autoincrement=True)
    hostname = Column(String(32))
    port = Column(String(32))
    ip = Column(String(32))

    # host_user = relationship('HostUser', secondary=HostToHostUser, backref='h')
    host_user = relationship('HostUser', secondary=HostToHostUser.__table__, backref='h')

class HostUser(Base):
    __tablename__ = 'host_user'
    nid = Column(Integer, primary_key=True,autoincrement=True)
    username = Column(String(32))


def init_table():
    Base.metadata.create_all(engine)

def drop_table():
    Base.metadata.drop_all(engine)


init_table()
Session  = sessionmaker(bind=engine)
session = Session()

2、操作表

增加操作

# 方法1
session.add(Group(caption='DBA'))
session.add(Group(caption='SA'))
session.commit()
# 方法2
session.add_all([
    User(username='jabe1',group_id = 1),
    User(username='jabe2',group_id = 2)
])
session.commit()

删除表数据操作

session.query(Users).filter(Users.id > 2).delete()
session.commit()

修改表记录操作  

session.query(Users).filter(Users.id > 2).update({"name" : "099"})
session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
session.commit()

查询操作

#条件查询
ret = session.query(User).filter(User.username == 'jabe1').all()
print(ret)

#单表查询
ret = session.query(User).all()
obj = ret[0]
print(ret)
print(obj)
print(obj.nid)
print(obj.username)
print(obj.group_id)

#关联查询
sql = session.query(User).join(Group,isouter=True)
print(sql)
ret = session.query(User).join(Group,isouter=True).all()
print(ret)
# sql = session.query(User.username,Group.caption).join(Group,isouter=True)
# print(sql)
# ret = session.query(User.username,Group.caption).join(Group,isouter=True).all()
# print(ret)

#原始方式(查询用户名称和用户组)
# ret = session.query(User.username,Group.caption).join(Group,isouter=True).all()
# print(ret)
#新方式正向查询
# ret = session.query(User).all()
# for obj in ret:
#     #obj代指user表每一行数据
#     #obj.group 代指group对象
#     print(obj.nid,obj.username,obj.group_id,obj.group,obj.group.nid,obj.group.caption)


#原始方式(查询所有属于DBA组的用户)
ret = session.query(User.username,Group.caption).join(Group,isouter=True).filter(Group.caption == 'DBA').all()
print(ret)
#新方式(反向查询)
obj = session.query(Group).filter(Group.caption == 'DBA').first()
print(obj.nid)
print(obj.caption)
rint(obj.uuu)  

附加

# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
    or_(
        Users.id < 2,
        and_(Users.name == 'eric', Users.id > 3),
        Users.extra != ""
    )).all()


# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()

# 限制
ret = session.query(Users)[1:2]

# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()

# 分组
from sqlalchemy.sql import func

ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).all()

ret = session.query(
    func.max(Users.id),
    func.sum(Users.id),
    func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()

# 连表

ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()

ret = session.query(Person).join(Favor).all()

ret = session.query(Person).join(Favor, isouter=True).all()


# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()

q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
其他

  

Paramiko  

 堡垒机

堡垒机执行流程:

  1. 管理员为用户在服务器上创建账号(将公钥放置服务器,或者使用用户名密码)
  2. 用户登陆堡垒机,输入堡垒机用户名密码,现实当前用户管理的服务器列表
  3. 用户选择服务器,并自动登陆
  4. 执行操作并同时将用户操作记录

注:配置.brashrc实现ssh登陆后自动执行脚本,如:/usr/bin/python /tmp/s13/day13/s7.py

 练习代码:

import paramiko
import sys
import os
import socket
import getpass

from paramiko.py3compat import u

# windows does not have termios...
try:
    import termios
    import tty
    has_termios = True
except ImportError:
    has_termios = False


def interactive_shell(chan):
    if has_termios:
        posix_shell(chan)
    else:
        windows_shell(chan)


def posix_shell(chan):
    import select

    oldtty = termios.tcgetattr(sys.stdin)
    try:
        tty.setraw(sys.stdin.fileno())
        tty.setcbreak(sys.stdin.fileno())
        chan.settimeout(0.0)
        log = open('handle.log', 'a+', encoding='utf-8')
        flag = False
        temp_list = []
        while True:
            r, w, e = select.select([chan, sys.stdin], [], [])
            if chan in r:
                try:
                    x = u(chan.recv(1024))
                    if len(x) == 0:
                        sys.stdout.write('\r\n*** EOF\r\n')
                        break
                    if flag:
                        if x.startswith('\r\n'):
                            pass
                        else:
                            temp_list.append(x)
                        flag = False
                    sys.stdout.write(x)
                    sys.stdout.flush()
                except socket.timeout:
                    pass
            if sys.stdin in r:
                x = sys.stdin.read(1)
                import json

                if len(x) == 0:
                    break

                if x == '\t':
                    flag = True
                else:
                    temp_list.append(x)
                if x == '\r':
                    log.write(''.join(temp_list))
                    log.flush()
                    temp_list.clear()
                chan.send(x)

    finally:
        termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty)


def windows_shell(chan):
    import threading

    sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF.\r\n\r\n")

    def writeall(sock):
        while True:
            data = sock.recv(256)
            if not data:
                sys.stdout.write('\r\n*** EOF ***\r\n\r\n')
                sys.stdout.flush()
                break
            sys.stdout.write(data)
            sys.stdout.flush()

    writer = threading.Thread(target=writeall, args=(chan,))
    writer.start()

    try:
        while True:
            d = sys.stdin.read(1)
            if not d:
                break
            chan.send(d)
    except EOFError:
        # user hit ^Z or F6
        pass


def run():
    default_username = getpass.getuser()
    username = input('Username [%s]: ' % default_username)
    if len(username) == 0:
        username = default_username


    hostname = input('Hostname: ')
    if len(hostname) == 0:
        print('*** Hostname required.')
        sys.exit(1)

    tran = paramiko.Transport((hostname, 22,))
    tran.start_client()

    default_auth = "p"
    auth = input('Auth by (p)assword or (r)sa key[%s] ' % default_auth)
    if len(auth) == 0:
        auth = default_auth

    if auth == 'r':
        default_path = os.path.join(os.environ['HOME'], '.ssh', 'id_rsa')
        path = input('RSA key [%s]: ' % default_path)
        if len(path) == 0:
            path = default_path
        try:
            key = paramiko.RSAKey.from_private_key_file(path)
        except paramiko.PasswordRequiredException:
            password = getpass.getpass('RSA key password: ')
            key = paramiko.RSAKey.from_private_key_file(path, password)
        tran.auth_publickey(username, key)
    else:
        pw = getpass.getpass('Password for %s@%s: ' % (username, hostname))
        tran.auth_password(username, pw)

    # 打开一个通道
    chan = tran.open_session()
    # 获取一个终端
    chan.get_pty()
    # 激活器
    chan.invoke_shell()

    interactive_shell(chan)

    chan.close()
    tran.close()


if __name__ == '__main__':
    run()

参考url:http://www.cnblogs.com/wupeiqi/articles/5699254.html

原文地址:https://www.cnblogs.com/Jabe/p/5727590.html