每月博客-20161205

根据之前指定的计划,每两周至少阅读一段比较有意义的代码,每月写一次博客。

之前计划都jdk的源码来着,发现还是比较高深的,一时还不好读懂,于是跟同事要了一段他写过的代码来阅读分析。

个人感觉,拿出一段可运行、有意义的代码,描述实现的功能,并添加适当的注释,分享出来,对于个人提升水平很有帮助,交流分享也可以使得大家都有所提升。

具体的内容如下(未经授权,不得随意转载):

//程序实现功能:多线程并发读取文件并打印


//第一部分 Main函数
package com.loong.test;

import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Hashtable;

import com.loong.data.imp.BigFileReader;
import com.loong.data.imp.IHandle;

public class Main {
public static void main(String[] args) throws Exception {
long startTime = System.currentTimeMillis();

BigFileReader.Builder builder = new BigFileReader.Builder("D:\账号数据\aaaaa.unl", new IHandle(){
@Override
public void handle(String line, long index) {
System.out.println(line);
}
});
builder.setTreahdSize(4).setCharset("GBK").setBufferSize(1024*1024);
BigFileReader bigFileReader = builder.build();
bigFileReader.start();


}
}

//第二部分 实现类
package com.loong.data.imp;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;

public class BigFileReader {
//并发线程数
private int threadSize;
//编码格式
private String charset;
//每块读取的大小
private int bufferSize;
private IHandle handle;
//线程调度器
private ExecutorService executorService;
//要读取的文件长度
private long fileLength;
private RandomAccessFile rAccessFile;
//存储文件分块的结构体
private Set<StartEndPail> startEndPails;
//线程同步器
private CyclicBarrier cyclicBarrier;
private AtomicLong counter = new AtomicLong(0);

//构造函数,由主函数通过handle函数来调用
private BigFileReader (File file,IHandle handle,String charset,int bufferSize, int threadSize) {
this.fileLength = file.length();
this.handle = handle;
this.charset = charset;
this.bufferSize = bufferSize;
this.threadSize = threadSize;
try {
this.rAccessFile = new RandomAccessFile(file,"r");
} catch (FileNotFoundException e) {
e.printStackTrace();
}
this.executorService = Executors.newFixedThreadPool(threadSize);
startEndPails = new HashSet<BigFileReader.StartEndPail>();
}

//主方法。
//1.计算出每个线程所要读取的分块
//2.启动各个线程读取文件
//3.在所有线程完成读取打印后执行关闭操作(通过同步器来实现)
public void start() {
long everySize = this.fileLength/this.threadSize;
try {
calculateStartEnd(0,everySize);
} catch (IOException e) {
e.printStackTrace();
return;
}
final long startTime = System.currentTimeMillis();
cyclicBarrier = new CyclicBarrier(startEndPails.size(),new Runnable(){
@Override
public void run() {
System.out.println("use time: "+(System.currentTimeMillis()-startTime));
System.out.println("all line: "+counter.get());
shutdown();
}
});
for (StartEndPail pail : startEndPails) {
System.out.println("分配分片: "+pail);
this.executorService.execute(new SliceReaderTask(pail));
}
}

//计算出每个线程所要读取的分块的函数
private void calculateStartEnd(long start, long size) throws IOException {
if(start > fileLength-1){
return;
}
StartEndPail pair = new StartEndPail();
pair.start = start;
long endPosition = start+size-1;
if(endPosition >= fileLength-1){
pair.end = fileLength - 1;
startEndPails.add(pair);
return;
}
rAccessFile.seek(endPosition);
byte tmp = (byte)rAccessFile.read();
while (tmp != ' ' && tmp != ' ') {
endPosition++;
if(endPosition >= fileLength-1){
endPosition = fileLength-1;
break;
}
rAccessFile.seek(endPosition);
tmp = (byte)rAccessFile.read();
}
pair.end = endPosition;
startEndPails.add(pair);
calculateStartEnd(endPosition+1, size);
}

//执行关闭操作的函数
public void shutdown() {
try {
this.rAccessFile.close();
} catch (IOException e) {
e.printStackTrace();
}
this.executorService.shutdown();

}

//用于打印的函数
public void handle(byte[] bytes) throws Exception {
String line = null;
if(this.charset == null){
line = new String(bytes);
}else{
line = new String(bytes,charset);
}
if(line != null && !"".equals(line)){
long index = counter.incrementAndGet();
this.handle.handle(line,index);
}

}

//存储线程读取文件分块的结构体
private static class StartEndPail{
public long start;
public long end;
@Override
public String toString() {
return "stat="+start+";end="+end;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (int)(end ^ (end >>> 32));
result = prime * result + (int)(start ^ (start >>> 32));
return result;
}
@Override
public boolean equals(Object obj) {
if(this == obj){
return true;
}
if(obj == null){
return false;
}
if(getClass() != obj.getClass()){
return false;
}
StartEndPail other = (StartEndPail)obj;
if(end != other.end){
return false;
}
if(start != other.start){
return false;
}
return true;
}
}

//具体读取文件的线程
public class SliceReaderTask implements Runnable{
private long start;
private long sliceSize;
private byte[] readBuff;
public SliceReaderTask(StartEndPail pair) {
this.start = pair.start;
this.sliceSize = pair.end - pair.start + 1;
this.readBuff = new byte[bufferSize];
}

@Override
public void run() {
try {
MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(MapMode.READ_ONLY, start, this.sliceSize);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
for (int offset = 0; offset < sliceSize; offset+=bufferSize) {
int readLenth;
if(offset + bufferSize <= sliceSize){
readLenth = bufferSize;
}else{
readLenth = (int)(sliceSize - offset);
}
mapBuffer.get(readBuff, 0, readLenth);
for (int i = 0; i < readLenth; i++) {
byte tmp = readBuff[i];
if(tmp == ' ' || tmp == ' '){
handle(bos.toByteArray());
bos.reset();
}else{
bos.write(tmp);
}
}
}
if(bos.size() > 0){
handle(bos.toByteArray());
}
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}

}
}

//供main函数调用的构造器
public static class Builder{
private int threadSize = 1;
private String charset = null;
private int bufferSize = 1024*1024;
private IHandle handle;
private File file;
public Builder(String file, IHandle handle) {
this.file = new File(file);
if(!this.file.exists()){
throw new IllegalArgumentException("文件不存在!");
}
this.handle = handle;
}
public Builder setTreahdSize(int size) {
this.threadSize = size;
return this;
}

public Builder setCharset(String charset) {
this.charset = charset;
return this;
}
public Builder setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
return this;
}
public BigFileReader build(){
return new BigFileReader(this.file, this.handle, this.charset, this.bufferSize, this.threadSize);
}

}


}

原文地址:https://www.cnblogs.com/songtianbao/p/6135077.html