1. 列表
#以下三式等价 c = (a>b and a or b) c = a if a>b else b c = [b, a][a>b]
字符串拼接
' + '.join('%s,%s'%(a,b) for a,b in zip(list('abc'), range(3)))
'a,0 + b,1 + c,2'
字符查找
'aa' in '大家aa' #True '大家aa'.find('大家') #0
for循环
from itertools import product result = product([1,2,3], ['a','b']) #[(1, 'a'), (1, 'b'), (2, 'a'), (2, 'b'), (3, 'a'), (3, 'b')]
数组重构,np.newaxis
np.arange(5)[:,None]
array([[0],
[1],
[2],
[3],
[4]])
数组拼接, np.r_[], np.c_[],注意是方括号
np.r_[np.array([1,2,3]), np.array([1,2,3])] #array([1, 2, 3, 1, 2, 3]) np.c_[np.array([1,2,3]), np.array([1,2,3])] #array([[1, 1], # [2, 2], # [3, 3]]) np.r_[np.array([[1,2,3],[1,2,3]]), np.array([[4,5,6],[4,5,6]])] #array([[1, 2, 3], # [1, 2, 3], # [4, 5, 6], # [4, 5, 6]]) np.c_[np.array([[1,2,3],[1,2,3]]), np.array([[4,5,6],[4,5,6]])] #array([[1, 2, 3, 4, 5, 6], # [1, 2, 3, 4, 5, 6]])
2. 嵌套列表展开,嵌套方法可实现多层列表展开
a=[[1,2],[3,4],[5,6]] print([j for i in a for j in i]) #列表生成式 sum(a, []) #求和函数 from functools import reduce reduce(lambda x,y: x+ y, a) #工具函数 from itertools import chain print(list(chain(*a))) import itertools print(list(itertools.chain.from_iterable(a))) t=[] [t.extend(i) for i in a] print(t) #递归的方法比较容易理解 #在stackoverflow看到大牛的列表生成式版本 func = lambda x: [y for l in x for y in func(l)] if type(x) is list else [x] #递归自定义 def expand_list(nested_list): for item in nested_list: if isinstance(item, (list, tuple)): for sub_item in expand_list(item): yield sub_item else: yield item
3. 工具函数
# map 把函数 f 依次作用在 list 的每个元素上,得到一个 iterator 并返回。 print(list(map(format_name, ['adam', 'LISA', 'barT']))) # reduce()传入的函数 f 必须接收两个参数,reduce()对list的每个元素反复调用函数f,并返回最终结果值。reduce()还可以接收第3个可选参数,作为计算的初始值。 from functools import reduce print(reduce(f, [1, 3, 5, 7, 9], 100)) # filter()根据判断结果自动过滤掉不符合条件的元素,返回由符合条件元素组成的iterator。 print(list(filter(is_odd, [1, 4, 6, 7, 9, 12, 17]))) #zip()参数可以接受任何类型的序列,同时也可以有两个以上的参数;当传入参数的长度不同时,zip能自动以最短序列长度为准进行截取,获得元组 [i for i in zip(['a','b'], (3,4,5))]
4. 时间
Python处理时间 time && datetime 模块
import time time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
5. 随机数
import numpy as np np.random.rand(10) np.random.randn(10) np.random.randint(10, size=10) np.random.shuffle(x) #类似洗牌,打乱顺序 np.random.permutation(citys.index) #返回一个随机排列 import random random.random() #只产生一个 random.randint(1, 10) random.uniform() random.randrange() #[random.randrange(0,9) for i in range(30)] random.shuffle(a) #a=[1,2,3], 直接对a的值修改
6. dataframe迭代(iteritems, iterrows, itertuples, apply)
import pandas as pd df = pd.DataFrame([{'c1':10, 'c2':100}, {'c1':11,'c2':110}, {'c1':12,'c2':120}]) [(i,j) for i,j in df.T.iteritems()] [(i,j) for i,j in df.iterrows()] [(i, j['c1'], j['c2']) for i,j in df.iterrows()] [(getattr(row, "c1"), getattr(row, "c2")) for row in df.itertuples(index=True, name='Pandas')] def valuation_formula(x, y): return x * y * 0.5 df['price'] = df.apply(lambda row: valuation_formula(row['c1'], row['c2']), axis=1)
7. dataframe修改索引
a.rename(columns={'A':'a', 'C':'c'}, inplace = True) #dataframe修改列名 a.index = ['1', '2', np.nan] # dataframe修改索引
8. 读写文件
读文件
import pandas as pd
import os
excel_data = pd.read_excel("./xx/xx.xlsx")
csv_data = pd.read_csv("./xx/xx.csv")
txt_data = pd.read_table("./xx/xx.txt")
json_data = pd.read_json("./xx/xx.json")
df = pd.read_clipboard(header=None) #从剪切板读取文件,参数同 pd.read_table
print('excel数据', os.linesep, excel_data)
分组txt文件读写
def write_list(path, li): extension = path.split('.')[-1] if extension == 'txt': with open(path, 'w', encoding='utf-8') as fp: #fp.writelines(li) fp.write(' '.join(li)) elif extension == 'csv': pd.DataFrame({'x':li}).to_csv(path, index=False, header=False) else: raise ValueError('扩展名只能指定为txt或csv...') def write_by_group(id_dict, outpath, size, extension): import os import math import pandas as pd if '/' in outpath: outfile = outpath + '/{}_{}.' + extension else: outfile = os.path.join(outpath, '{}_{}.txt') for gro_ in id_dict.keys(): ids = id_dict[gro_] st, ed = 0, 0 for i in range(math.ceil(len(ids)/size)): ed += size write_list(outfile.format(gro_, i), ids[st:ed]) st = ed def main(infile, outfile, size, extension): from collections import defaultdict with open(infile, 'r', encoding='utf-8') as fp: id_dict = defaultdict(lambda: []) for s in fp.readlines(): id_,gro_ = s.strip(' ').split('|') id_dict[gro_] += [id_] write_by_group(id_dict, outpath, size, extension) if __name__ == "__main__": infile = r'e:新建文本文档.txt' outpath = 'e:/新建文件夹' size = 3 main(infile, outpath, size, 'txt')
txt文件读写
import math def write_list(path, li): with open(path, 'w', encoding='utf-8') as fp: fp.writelines(li) def main(): with open('aa.txt', 'r', encoding='utf-8') as fp: ids = [s.split('|')[0]+' ' for s in fp.readlines()] n = 10 size = math.ceil(len(ids)/n) st, ed = 0, 0 for i in range(n): ed += size write_list('e:a{}.txt'.format(i), ids[st:ed]) st = ed if __name__ == "__main__": main()
内存文件读写
from io import StringIO #f = StringIO() #f.write('Hello World!') f = StringIO('Hello World!') string = f.readlines() print(string)
批量读取大文件
''' 报错:OSError: Initializing from file failed 解决:pd.read_csv('xxx.csv', chunksize=100, engine='python') ''' df = pd.read_csv('xxx.csv', chunksize=1000000) for chunk in df: print(chunk.shape) df = pd.read_csv('xxx.csv', iterator=True) df.get_chunk(1000000).shape def get_df(file): mylist = [] for chunk in pd.read_csv(file, chunksize=1000000): mylist.append(chunk) temp_df = pd.concat(mylist, axis=0) del mylist return temp_df.shape
写文件
df.to_csv("./xx/xx.csv", index=False, encoding='utf-8') df.to_excel("./xx/xx.xlsx") #utf_8_sig, gbk, gb2312, gb18030 df.to_dict(orient = 'records') #orient默认dict(字典套字典), 还可 ‘list’, ‘series’, ‘split’, ‘records’, ‘index’
excel文件的读写
import xlrd file_path = r'F:/test.xlsx' #路径前加 r,读取的文件路径 file_path = file_path.decode('utf-8') #文件路径的中文转码 data = xlrd.open_workbook(file_path) #获取数据 table = data.sheet_by_name('Sheet1') #获取sheet nrows = table.nrows #获取总行数 ncols = table.ncols #获取总列数 rowvalue = table.row_values(5) #获取一行的数值,例如第5行 col_values = table.col_values(6) #获取一列的数值,例如第6列 cell_value = table.cell(5,6).value #获取一个单元格的数值,例如第5行第6列
excel文件读写
import xlwt import xlrd from xlutils.copy import copy #打开需要操作的excel表 wb=xlrd.open_workbook(path) #复制原有表 newb=copy(wb) #新增sheet,参数是该sheet的名字,可自定义 wbsheet=newb.add_sheet(dl+'-'+dn) #向新sheet中写入数据。本代码中的d是某个dataframe wbsheet.write(0,0,'date') wbsheet.write(0,1,'visited') wbsheet.write(0,2,'success') for i in range(d.shape[0]): wbsheet.write(i + 1, 0, d.iloc[i, 0]) for j in range(1,d.shape[1]): wbsheet.write(i+1,j,int(d.iloc[i,j])) #获取原有excel表中sheet名为‘summary'的sheet sumsheet=newb.get_sheet('summary') #k表示该sheet的最后一行 k=len(sumsheet.rows) #想原有sheet后面新增数据 sumsheet.write(k,0,dl+'-'+dn) sumsheet.write(k,1,int(sum(d['visited']))) sumsheet.write(k,2,int(sum(d['success']))) #保存为原有的excel表路径 newb.save(path)
pd.read_excel,pd.to_excel参数
pandas.read_excel(io, sheet_name=0, header=0, skiprows=None, skip_footer=0, index_col=None, names=None, usecols=None, parse_dates=False, date_parser=None, na_values=None, thousands=None, convert_float=True, converters=None, dtype=None, true_values=None, false_values=None, engine=None, squeeze=False, **kwds) DataFrame.to_excel(excel_writer, sheet_name='Sheet1', na_rep='', float_format=None, columns=None, header=True, index=True, index_label=None, startrow=0, startcol=0, engine=None, merge_cells=True, encoding=None, inf_rep='inf', verbose=True, freeze_panes=None)
追加excel文件,不覆盖原有数据
def excelAddSheet(dataframe, filepath, sheet_name): import os from openpyxl import load_workbook if os.path.exists(filepath): #当表名已存在时,后面还可以添加参数,进行追加 book = load_workbook(filepath) #FileNotFoundError excelWriter = pd.ExcelWriter(filepath, engine='openpyxl') excelWriter.book = book dataframe.to_excel(excel_writer=excelWriter, sheet_name=sheet_name, index=None) excelWriter.save() excelWriter.close() else: dataframe.to_excel(filepath, sheet_name=sheet_name, index=None) excelAddSheet(df2[:2], filepath, 'Sheet1') excelAddSheet(df2[:2], filepath, 'Sheet2')
9. 数据库文件读写
import pandas as pd from sqlalchemy import create_engine, types config = {db_flag : "mysql", host_ip : "127.0.0.1", host_port : 3306, db_name : "test", table_name : "aaa", user : "root", pawd : "root", charset : "utf-8"} engine_config = '''mysql+pymysql://{user}:{pawd}@{host_ip}:{host_port}/{db_name}?charset={charset}'''.format(**config) engine= create_engine(engine_config) pd.read_sql_query(sql, engine) conn = engine.connect() df.to_sql(table_name, conn, if_exists='append', index=False)
10. 断言 - 单元测试
assert 1==1 assert 1==2 #Traceback (most recent call last): # File "<ipython-input-174-730332727407>", line 1, in <module> # assert 1==2 #AssertionError #等价于 if not 1==2: raise AssertionError #Traceback (most recent call last): # File "<ipython-input-176-bd2dcc9e33c8>", line 2, in <module> # raise AssertionError #AssertionError #try ... except .. else #自定义新异常 class Networkerror(RuntimeError): def __init__(self, arg): self.args = arg try: raise Networkerror("Bad hostname") except Networkerror,e: print e.args