一、ORM
dbfist——数据库优先
corefist——代码优先
我们可以通过类和对象来操作数据库了
连表操作
一对多
1、创建表,主动指定外键约束
2、操作:
类:repr
单表
连表
session.queue(表1).join(表2).all()
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:@192.168.11.90:3306/wzc", max_overflow=5) Base = declarative_base() #单表 class Test(Base): __tablename__="test" nid=Column(Integer,primary_key=True,autoincrement=True) name=Column(String(32)) #一对多 class Group(Base): __tablename__ = "group" nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) class User(Base): __tablename__ = "User" nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) group_id=Column(Integer,ForeignKey('group.nid')) def __repr__(self): temp="%s-%s_%s" %(self.nid,self.name,self.group_id) return temp def init_db(): Base.metadata.create_all(engine) def drop_db(): Base.metadata.drop_all(engine) init_db()
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:@192.168.11.90:3306/wzc", max_overflow=5) Base = declarative_base() Session = sessionmaker(bind=engine) session = Session() class Group(Base): __tablename__ = "group" nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) class User(Base): __tablename__ = "User" nid = Column(Integer, primary_key=True, autoincrement=True) name = Column(String(32)) group_id=Column(Integer,ForeignKey('group.nid')) group=relationship("Group",backref='uuu') ret=session.query(User).all() for obj in ret: print(obj.nid,obj.name,obj.group) obj=session.query(Group).filter(Group.caption=="dba").first() print(obj.nid) print(obj.aption) print(obj.uuu)
上面这个程序里面使用了反向查找,而反向查找主要是使用了relationship这个功能
我们之前都是用的正向查找,但是在有些时候我们需要使用反向查找来得到数据
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:@192.168.11.90:3306/wzc", max_overflow=5) Base = declarative_base() Session = sessionmaker(bind=engine) session = Session() class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) class HostToHostUser(Base): __tablename__ = 'host_to_host_user' nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey('host.nid')) host_user_id = Column(Integer,ForeignKey('host_user.nid')) def init_db(): Base.metadata.create_all(engine) # init_db()#创建库 # session.add_all([ # Host(hostname='c1',port='22',ip='1.1.1.1'), # Host(hostname='c2',port='22',ip='1.1.1.2'), # Host(hostname='c3',port='22',ip='1.1.1.3'), # Host(hostname='c4',port='22',ip='1.1.1.4'), # Host(hostname='c5',port='22',ip='1.1.1.5'), # ]) # session.commit()#执行命令 #建表 # # session.add_all([ # HostUser(username='root'), # HostUser(username='db'), # HostUser(username='nb'), # HostUser(username='sb'), # ]) # session.commit() # # session.add_all([ # HostToHostUser(host_id=1,host_user_id=1), # HostToHostUser(host_id=1,host_user_id=2), # HostToHostUser(host_id=1,host_user_id=3), # HostToHostUser(host_id=2,host_user_id=2), # HostToHostUser(host_id=2,host_user_id=4), # HostToHostUser(host_id=2,host_user_id=3), # ]) # session.commit() host_obj=session.query(Host).filter(Host.hostname=="c1").first() host_2_host_user=session.query(HostToHostUser.host_user_id).filter(HostToHostUser.host_user_id==host_obj.nid).all() r=zip(*host_2_host_user) user=session.query(HostUser.username).filter(HostUser.nid.in_(list(r)[0])).all() print(user)
这个就是多对多,我们在创建的时候,首先创建了用户的表,然后创建了机器的的表,最后创建了他们的关系表,就是服务器上面有哪些用户
然后插入数据,表内容和表关系全部存在对应的表里面
多对多
1,创建表,额外的表关系
2,filter()
in_(都可以是另外一个查询,把另外一个查询结果放到这个查询里面,其实这个就是连表工能)
3,relationship,建立关系
4relationship更简便方式
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:@192.168.11.90:3306/wzc", max_overflow=5) Base = declarative_base() Session = sessionmaker(bind=engine) session = Session() class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) class HostToHostUser(Base): __tablename__ = 'host_to_host_user' nid = Column(Integer, primary_key=True,autoincrement=True) host_id = Column(Integer,ForeignKey('host.nid')) host_user_id = Column(Integer,ForeignKey('host_user.nid')) host=relationship("Host",backref='h') host_user=relationship("HostUser",backref='u') host_obj=session.query(Host).filter(Host.hostname=='c1').first() for i in host_obj.h: print(i.host_user.username)
这个是上面那边程序的简化版,就是灵活的运用多对多来进行使用的,多对多其实就是在一个关系表里面,制造多个一对多的节奏
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index,Table from sqlalchemy.orm import sessionmaker, relationship engine = create_engine("mysql+pymysql://root:@192.168.11.90:3306/wzc", max_overflow=5) Base = declarative_base() Session = sessionmaker(bind=engine) session = Session() HostToHostUser=Table('host_to_host_user',Base.metadata, Column('host_id',ForeignKey('host.nid'),primary_key=True), Column('host_user_id',ForeignKey('host_user.nid'),primary_key=True) ) class Host(Base): __tablename__ = 'host' nid = Column(Integer, primary_key=True,autoincrement=True) hostname = Column(String(32)) port = Column(String(32)) ip = Column(String(32)) host_user=relationship('HostUser',secondary=HostToHostUser,backref='h') class HostUser(Base): __tablename__ = 'host_user' nid = Column(Integer, primary_key=True,autoincrement=True) username = Column(String(32)) # class HostToHostUser(Base): # __tablename__ = 'host_to_host_user' # nid = Column(Integer, primary_key=True,autoincrement=True) # # host_id = Column(Integer,ForeignKey('host.nid')) # host_user_id = Column(Integer,ForeignKey('host_user.nid')) # # host=relationship("Host",backref='h') # host_user=relationship("HostUser",backref='u') host_obj=session.query(Host).filter(Host.hostname=='c1').first() print(host_obj.host_user)
这个方式更简便,里面表的对应关系,和之前的不一样了,之前是,a b ab关系表,这个是a b ab关系,但是在表a里面有关系信息,这样操作就更简便了
从关系表找其他表这个就是正向,从其他表找关系表这个就是反向
二、paramiko模块
SSHClient
用户名和密码:
SSHClient,TRANpory
SFTPclient
用户名和密码:
Tranport
需求:
命令上传文件,命令
import paramiko
import paramiko import uuid class SSHConnection(object): def __init__(self, host='192.168.11.90', port=22, username='root',pwd='123'): self.host = host self.port = port self.username = username self.pwd = pwd self.__k = None def run(self): self.connect() pass self.close() def connect(self): transport = paramiko.Transport((self.host,self.port)) transport.connect(username=self.username,password=self.pwd) self.__transport = transport def close(self): self.__transport.close() def cmd(self, command): ssh = paramiko.SSHClient() ssh._transport = self.__transport # 执行命令 stdin, stdout, stderr = ssh.exec_command(command) # 获取命令结果 result = stdout.read() return result def upload(self,local_path, target_path): # 连接,上传 sftp = paramiko.SFTPClient.from_transport(self.__transport) # 将location.py 上传至服务器 /tmp/test.py sftp.put(local_path, target_path) ssh = SSHConnection() ssh.connect() r1 = ssh.cmd('df') print(r1) ssh.upload('1.mp4', "/1.mp4") ssh.close()
这个是最基本的使用,使用ssh和sftp
import paramiko import sys import os import socket import select import getpass from paramiko.py3compat import u tran = paramiko.Transport(('192.168.11.90', 22,)) tran.start_client() tran.auth_password('root', '123') # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() while True: # 监视用户输入和服务器返回数据 # sys.stdin 处理用户输入 # chan 是之前创建的通道,用于接收服务器返回信息 readable, writeable, error = select.select([chan, sys.stdin, ], [], [], 1) if chan in readable: try: x = u(chan.recv(1024)) if len(x) == 0: print(' *** EOF ') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in readable: inp = sys.stdin.readline() chan.sendall(inp) chan.close() tran.close()
这个就可以在终端输入内容了,但是输入的时候不是实时的,需要执行一条回车一次
import paramiko import sys import os import socket import select import getpass from paramiko.py3compat import u tran = paramiko.Transport(('192.168.11.90', 22,)) tran.start_client() tran.auth_password('root', '123') # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() while True: # 监视用户输入和服务器返回数据 # sys.stdin 处理用户输入 # chan 是之前创建的通道,用于接收服务器返回信息 readable, writeable, error = select.select([chan, sys.stdin, ], [], [], 1) if chan in readable: try: x = u(chan.recv(1024)) if len(x) == 0: print(' *** EOF ') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in readable: inp = sys.stdin.readline() chan.sendall(inp) chan.close() tran.close()
这个是更简便的一个版本,每次点一个就会有返回
连接:
堡垒机
import paramiko import sys import os import socket import getpass from paramiko.py3compat import u # windows does not have termios... try: import termios import tty has_termios = True except ImportError: has_termios = False def interactive_shell(chan): if has_termios: posix_shell(chan) else: windows_shell(chan) def posix_shell(chan): import select oldtty = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) tty.setcbreak(sys.stdin.fileno()) chan.settimeout(0.0) flag = False temp_list = [] while True: r, w, e = select.select([chan, sys.stdin], [], []) if chan in r: try: x = u(chan.recv(1024)) if len(x) == 0: sys.stdout.write(' *** EOF ') break sys.stdout.write(x) sys.stdout.flush() except socket.timeout: pass if sys.stdin in r: x = sys.stdin.read(1) import json if len(x) == 0: break chan.send(x) finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, oldtty) def windows_shell(chan): import threading sys.stdout.write("Line-buffered terminal emulation. Press F6 or ^Z to send EOF. ") def writeall(sock): while True: data = sock.recv(256) if not data: sys.stdout.write(' *** EOF *** ') sys.stdout.flush() break sys.stdout.write(data) sys.stdout.flush() writer = threading.Thread(target=writeall, args=(chan,)) writer.start() try: while True: d = sys.stdin.read(1) if not d: break chan.send(d) except EOFError: # user hit ^Z or F6 pass def run(): # 获取当前登录用户 username = raw_input('Username ') hostname = raw_input('Hostname: ') pwd = raw_input('password: ') tran = paramiko.Transport(('192.168.11.90', 22,)) tran.start_client() tran.auth_password('root', '123') # 打开一个通道 chan = tran.open_session() # 获取一个终端 chan.get_pty() # 激活器 chan.invoke_shell() interactive_shell(chan) chan.close() tran.close() if __name__ == '__main__': run()