UserScan的处理流程分析

UserScan的处理流程分析

前置说明

Userscan是通过clientcp中发起的scanner操作。

Scan中通过caching属性来返回能够返回多少条数据。每次进行next时。

通过batch属性来设置每次在rs端每次nextkv时,可读取多少个kv(在同一行的情况下)

在生成Scan实例时。最好是把familycolumn都设置上,这样能保证查询的最高效.

client端通过生成Scan实例,通过HTable下的例如以下方法得到ClientScanner实例

publicResultScannergetScanner(finalScan scan)

在生成的ClientScanner实例中的callable属性的值为生成的一个ScannerCallable实例。

并通过callable.prepare(tries!= 0);方法得到此scanstartkey所在的regionlocation.meta表中。

startkey相应的location中得到此locationHRegionInfo信息。

并设置ClientScanner.currentRegion的值为当前的region.也就是startkey所在的region.


通过ClientScanner.nextrs发起rpc调用操作。

调用HRegionServer.scan

publicScanResponse scan(finalRpcControllercontroller,finalScanRequest request)



ClientScanner.next时,首先是发起openScanner操作,得到一个ScannerId

通过ScannerCallable.call方法:

if(scannerId== -1L) {

this.scannerId= openScanner();

} else{

openScanner方法:中发起一个scan操作,通过rpc调用rs.scan

ScanRequest request=

RequestConverter.buildScanRequest(

getLocation().getRegionInfo().getRegionName(),

this.scan,0, false);

try{

ScanResponse response= getStub().scan(null,request);

longid =response.getScannerId();

if(logScannerActivity){

LOG.info("Openscanner=" + id+ " for scan="+ scan.toString()

+ "on region " +getLocation().toString());

}

returnid;


HregionServer.scan中对openScanner的处理:

publicScanResponse scan(finalRpcControllercontroller,finalScanRequest request)

throwsServiceException {

Leases.Lease lease= null;

String scannerName= null;

........................................非常多代码没有显示

requestCount.increment();


intttl = 0;

HRegion region= null;

RegionScannerscanner =null;

RegionScannerHolder rsh= null;

booleanmoreResults= true;

booleancloseScanner= false;

ScanResponse.Builder builder= ScanResponse.newBuilder();

if(request.hasCloseScanner()){

closeScanner= request.getCloseScanner();

}

introws = 1;

if(request.hasNumberOfRows()){

rows= request.getNumberOfRows();

}

if(request.hasScannerId()){

.................................非常多代码没有显示

} else{

得到请求的HRegion实例,也就是startkey所在的HRegion

region= getRegion(request.getRegion());

ClientProtos.Scan protoScan= request.getScan();

booleanisLoadingCfsOnDemandSet= protoScan.hasLoadColumnFamiliesOnDemand();

Scan scan= ProtobufUtil.toScan(protoScan);

//if the request doesn't set this, get the default region setting.

if(!isLoadingCfsOnDemandSet){

scan.setLoadColumnFamiliesOnDemand(region.isLoadingCfsOnDemandDefault());

}

scan.getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_ENABLE);

假设scan没有设置family,region中全部的family当成scanfamily

region.prepareScanner(scan);

if(region.getCoprocessorHost()!= null){

scanner= region.getCoprocessorHost().preScannerOpen(scan);

}

if(scanner ==null){

运行HRegion.getScanner方法。生成HRegion.RegionScannerImpl方法

scanner= region.getScanner(scan);

}

if(region.getCoprocessorHost()!= null){

scanner= region.getCoprocessorHost().postScannerOpen(scan,scanner);

}

把生成的RegionScanner加入到scanners集合容器中。

并设置scannerid(一个随机的值),

scannernamescanneridstring版本号。加入过期监控处理,

通过hbase.client.scanner.timeout.period配置过期时间,默认值为60000ms

老版本号通过hbase.regionserver.lease.period配置。

过期检查线程通过Leases完毕。

scanner的过期处理通过一个

HregionServer.ScannerListener.leaseExpired实例来完毕。


scannerId= addScanner(scanner,region);

scannerName= String.valueOf(scannerId);

ttl= this.scannerLeaseTimeoutPeriod;

}

............................................非常多代码没有显示


Hregion.getScanner方法生成RegionScanner实例流程


publicRegionScannergetScanner(Scanscan)throwsIOException {

returngetScanner(scan,null);

}


层次的调用,此时传入的kvscannerlistnull

protectedRegionScannergetScanner(Scanscan,

List<KeyValueScanner>additionalScanners)throwsIOException {

startRegionOperation(Operation.SCAN);

try{

//Verify families are all valid

prepareScanner(scan);

if(scan.hasFamilies()){

for(byte[] family :scan.getFamilyMap().keySet()){

checkFamily(family);

}

}

returninstantiateRegionScanner(scan,additionalScanners);

}finally{

closeRegionOperation();

}

}


终于生成一个HRegion.RegionScannerImpl实例

protectedRegionScannerinstantiateRegionScanner(Scanscan,

List<KeyValueScanner>additionalScanners)throwsIOException {

returnnewRegionScannerImpl(scan,additionalScanners,this);

}


RegionScanner实例的生成构造方法:

RegionScannerImpl(Scanscan,List<KeyValueScanner>additionalScanners,HRegion region)

throwsIOException {


this.region= region;

this.maxResultSize= scan.getMaxResultSize();

if(scan.hasFilter()){

this.filter= newFilterWrapper(scan.getFilter());

} else{

this.filter= null;

}


this.batch= scan.getBatch();

if(Bytes.equals(scan.getStopRow(),HConstants.EMPTY_END_ROW)&& !scan.isGetScan()){

this.stopRow= null;

} else{

this.stopRow= scan.getStopRow();

}

//If we are doing a get, we want to be [startRow,endRow] normally

//it is [startRow,endRow) and if startRow=endRow we get nothing.

this.isScan= scan.isGetScan()?

-1 : 0;


//synchronize on scannerReadPoints so that nobody calculates

//getSmallestReadPoint, before scannerReadPoints is updated.

IsolationLevelisolationLevel= scan.getIsolationLevel();

synchronized(scannerReadPoints){

if(isolationLevel== IsolationLevel.READ_UNCOMMITTED){

//This scan can read even uncommitted transactions

this.readPt= Long.MAX_VALUE;

MultiVersionConsistencyControl.setThreadReadPoint(this.readPt);

} else{

this.readPt= MultiVersionConsistencyControl.resetThreadReadPoint(mvcc);

}

scannerReadPoints.put(this,this.readPt);

}


//Here we separate all scanners into two lists - scanner that providedata required

//by the filter to operate (scanners list) and all others(joinedScanners list).

List<KeyValueScanner>scanners =newArrayList<KeyValueScanner>();

List<KeyValueScanner>joinedScanners= newArrayList<KeyValueScanner>();

if(additionalScanners!= null){

scanners.addAll(additionalScanners);

}

迭代每个要进行scanstore。生成详细的StoreScanner实例。

通常情况下joinedHead的值为null

for(Map.Entry<byte[],NavigableSet<byte[]>>entry :

scan.getFamilyMap().entrySet()){

Storestore =stores.get(entry.getKey());

生成StoreScanner实例。通过HStore.getScanner(scan,columns);

KeyValueScannerscanner =store.getScanner(scan,entry.getValue());

if(this.filter== null|| !scan.doLoadColumnFamiliesOnDemand()

||this.filter.isFamilyEssential(entry.getKey())){

scanners.add(scanner);

} else{

joinedScanners.add(scanner);

}

}

生成KeyValueHeap实例。把全部的storescanner的開始位置移动到startkey的位置并得到topStoreScanner,

this.storeHeap= newKeyValueHeap(scanners,comparator);

if(!joinedScanners.isEmpty()){

this.joinedHeap= newKeyValueHeap(joinedScanners,comparator);

}

}


得到StoreScanner实例的HStore.getScanner(scan,columns)方法

publicKeyValueScannergetScanner(Scanscan,

finalNavigableSet<byte[]> targetCols)throwsIOException {

lock.readLock().lock();

try{

KeyValueScannerscanner =null;

if(this.getCoprocessorHost()!= null){

scanner= this.getCoprocessorHost().preStoreScannerOpen(this,scan,targetCols);

}

if(scanner ==null){

scanner= newStoreScanner(this,getScanInfo(),scan,targetCols);

}

returnscanner;

}finally{

lock.readLock().unlock();

}

}

生成StoreScanner的构造方法:

publicStoreScanner(Storestore,ScanInfo scanInfo,Scan scan,finalNavigableSet<byte[]>columns)

throwsIOException {

this(store,scan.getCacheBlocks(),scan,columns,scanInfo.getTtl(),

scanInfo.getMinVersions());

假设设置有scan_raw_属性时。columns的值须要为null

if(columns !=null&& scan.isRaw()){

thrownewDoNotRetryIOException(

"Cannotspecify any column for a raw scan");

}

matcher= newScanQueryMatcher(scan,scanInfo,columns,

ScanType.USER_SCAN,Long.MAX_VALUE,HConstants.LATEST_TIMESTAMP,

oldestUnexpiredTS);

得到StoreFileScanner,StoreFileScanner中引用的StoreFile.Reader中引用HFileReaderV2,

HFileReaderV2的实例在StoreFile.Reader中假设已经存在。不会又一次创建,这样会加快scanner的创建时间。

//Pass columns to try to filter out unnecessary StoreFiles.

List<KeyValueScanner>scanners =getScannersNoCompaction();


//Seek all scanners to the start of the Row (or if the exact matchingrow

//key does not exist, then to the start of the next matching Row).

//Always check bloom filter to optimize the top row seek for delete

//family marker.

if(explicitColumnQuery&& lazySeekEnabledGlobally){

for(KeyValueScannerscanner :scanners) {

scanner.requestSeek(matcher.getStartKey(),false,true);

}

}else{

if(!isParallelSeekEnabled){

for(KeyValueScannerscanner :scanners) {

scanner.seek(matcher.getStartKey());

}

} else{

parallelSeek(scanners,matcher.getStartKey());

}

}


//set storeLimit

this.storeLimit= scan.getMaxResultsPerColumnFamily();


//set rowOffset

this.storeOffset= scan.getRowOffsetPerColumnFamily();


//Combine all seekedscanners with a heap

heap= newKeyValueHeap(scanners,store.getComparator());

注冊,假设有storefile更新时。把更新后的storefile加入到这个StoreScanner中来。

this.store.addChangedReaderObserver(this);

}


发起scanrpc操作

client端发起openScanner操作后,得到一个scannerId.此时发起scan操作。

通过ScannerCallable.call中发起call的操作,在scannerId不等于-1时。


Result [] rrs= null;

ScanRequest request= null;

try{

incRPCcallsMetrics();

request= RequestConverter.buildScanRequest(scannerId,caching,false,nextCallSeq);

ScanResponse response= null;

PayloadCarryingRpcControllercontroller= newPayloadCarryingRpcController();

try{

controller.setPriority(getTableName());

response= getStub().scan(controller,request);

...................................此处省去一些代码

nextCallSeq++;

longtimestamp =System.currentTimeMillis();

//Results are returned via controller

CellScannercellScanner= controller.cellScanner();

rrs= ResponseConverter.getResults(cellScanner,response);



HregionServer.scan方法中对scan时的处理流程:

得到scan中的caching属性的值,此值主要用来响应client返回的条数。

假设一行数据包括多个kv,算一条

introws = 1;

if(request.hasNumberOfRows()){

rows= request.getNumberOfRows();

}

假设client传入的scannerId有值,也就是不等于-1时,表示不是openScanner操作,检查scannerid是否过期

if(request.hasScannerId()){

rsh= scanners.get(scannerName);

if(rsh ==null){

LOG.info("Clienttried to access missing scanner " +scannerName);

thrownewUnknownScannerException(

"Name:" + scannerName+ ", already closed?");

}

此处主要是检查region是否发生过split操作。假设是会出现NotServingRegionException操作。

scanner= rsh.s;

HRegionInfo hri= scanner.getRegionInfo();

region= getRegion(hri.getRegionName());

if(region !=rsh.r){ // Yes, should be the same instance

thrownewNotServingRegionException("Regionwas re-opened after the scanner"

+ scannerName+ " was created: "+ hri.getRegionNameAsString());

}

} else{

...................................此处省去一些生成Regionscanner的代码

}

表示有设置caching,假设是运行scan,此时的默认值为1,当前scan中设置有caching后,使用scan中设置的值

if(rows >0) {

//if nextCallSeq does not match throw Exception straight away. Thisneeds to be

//performed even before checking of Lease.

//See HBASE-5974

是否有配置nextCallSeq的值。第一次调用时,此值为0,每调用一次加一,client也一样,每调用一次加一。

if(request.hasNextCallSeq()){

if(rsh ==null){

rsh= scanners.get(scannerName);

}

if(rsh !=null){

if(request.getNextCallSeq()!= rsh.nextCallSeq){

thrownewOutOfOrderScannerNextException("ExpectednextCallSeq: " + rsh.nextCallSeq

+ "But the nextCallSeq got from client: "+ request.getNextCallSeq()+

";request=" +TextFormat.shortDebugString(request));

}

//Increment the nextCallSeq value which is the next expected fromclient.

rsh.nextCallSeq++;

}

}

try{

先从租约管理中移出此租约,防止查找时间大于过期时间而出现的超时

//Remove lease while its being processed in server; protects againstcase

//where processing of request takes > lease expiration time.

lease= leases.removeLease(scannerName);

生成要返回的条数的一个列表。scan.caching

List<Result>results =newArrayList<Result>(rows);

longcurrentScanResultSize= 0;


booleandone =false;

调用cppreScannernext,假设返回为true,表示不在运行scan操作。

//Call coprocessor. Get region info from scanner.

if(region !=null&& region.getCoprocessorHost()!= null){

Boolean bypass= region.getCoprocessorHost().preScannerNext(

scanner,results,rows);

if(!results.isEmpty()){

for(Result r :results) {

if(maxScannerResultSize< Long.MAX_VALUE){

for(Cellkv :r.rawCells()){

//TODO

currentScanResultSize+= KeyValueUtil.ensureKeyValue(kv).heapSize();

}

}

}

}

if(bypass !=null&& bypass.booleanValue()){

done= true;

}

}

运行scan操作。

CppreScannerNext返回为false,或没有设置cp(主要是RegionObServer)

返回给client的最大size通过hbase.client.scanner.max.result.size配置。默觉得long.maxvalue

假设scan也设置有maxResultSize,使用scan设置的值

if(!done) {

longmaxResultSize= scanner.getMaxResultSize();

if(maxResultSize<= 0) {

maxResultSize= maxScannerResultSize;

}

List<Cell>values =newArrayList<Cell>();

MultiVersionConsistencyControl.setThreadReadPoint(scanner.getMvccReadPoint());

region.startRegionOperation(Operation.SCAN);

try{

inti = 0;

synchronized(scanner){

此处開始迭代,開始调用regionScanner(HRegion.RegionScannerImpl.nextRaw(List))进行查找,

迭代的长度为scan设置的caching的大小,假设运行RegionScanner.nextRaw(List)返回为false,时也会停止迭代

for(; i <rows

&&currentScanResultSize< maxResultSize;i++) {

返回的true表示还有数据。能够接着查询,否则表示此region中已经没有符合条件的数据了。

//Collect values to be returned here

booleanmoreRows =scanner.nextRaw(values);

if(!values.isEmpty()){

if(maxScannerResultSize< Long.MAX_VALUE){

for(Cellkv :values) {

currentScanResultSize+= KeyValueUtil.ensureKeyValue(kv).heapSize();

}

}

results.add(Result.create(values));

}

if(!moreRows){

break;

}

values.clear();

}

}

region.readRequestsCount.add(i);

} finally{

region.closeRegionOperation();

}


//coprocessor postNext hook

if(region !=null&& region.getCoprocessorHost()!= null){

region.getCoprocessorHost().postScannerNext(scanner,results,rows,true);

}

}

假设没有能够再查找的数据时。设置responsemoreResultsfalse

//If the scanner's filter - if any - is done with the scan

//and wants to tell the client to stop the scan. This is done bypassing

//a null result, and setting moreResults to false.

if(scanner.isFilterDone()&& results.isEmpty()){

moreResults= false;

results= null;

} else{

加入结果到response中,假设hbase.client.rpc.codec配置有codec的值,

默认取hbase.client.default.rpc.codec配置的值。默觉得KeyValueCodec

假设上面说的codec配置不为null时,把results生成为一个iterator,并生成一个匿名的CallScanner实现类

设置到scan时传入的controller中。这样能提升查询数据的读取性能。

假设没有配置codec时。默认直接把results列表设置到response中,这样响应的数据可能会比較大。

addResults(builder,results,controller);

}

} finally{

又一次把租约放入到租约检查管理器中,此租约主要来检查client多长时间没有发起过scan的操作。

//We're done. On way out re-add the above removed lease.

//Adding resets expiration time on lease.

if(scanners.containsKey(scannerName)){

if(lease !=null)leases.addLease(lease);

ttl= this.scannerLeaseTimeoutPeriod;

}

}

}


client端获取响应的数据:ScannerCallable.call方法中

rrs= ResponseConverter.getResults(cellScanner,response);


ResponseConverter.getResults方法的实现

publicstaticResult[] getResults(CellScannercellScanner,ScanResponse response)

throwsIOException {

if(response== null)returnnull;

//If cellscanner,then the number of Results to return is the count of elements in the

//cellsPerResult list. Otherwise, it is how many results are embeddedinside the response.

intnoOfResults= cellScanner!= null?

response.getCellsPerResultCount():response.getResultsCount();

Result[] results= newResult[noOfResults];

for(inti = 0; i< noOfResults;i++) {

cellScanner假设codec配置为有值时,在rs响应时会生成一个匿名的实现

if(cellScanner!= null){

......................................

intnoOfCells =response.getCellsPerResult(i);

List<Cell>cells = newArrayList<Cell>(noOfCells);

for(intj = 0; j< noOfCells;j++) {

try{

if(cellScanner.advance()== false){

.....................................

String msg= "Results sent from server="+ noOfResults+ ". But only got "+ i

+ "results completely at client. Resetting the scanner to scan again.";

LOG.error(msg);

thrownewDoNotRetryIOException(msg);

}

} catch(IOException ioe){

...........................................

LOG.error("Exceptionwhile reading cells from result."

+ "Resettingthe scanner to scan again.", ioe);

thrownewDoNotRetryIOException("Resettingthe scanner.", ioe);

}

cells.add(cellScanner.current());

}

results[i]= Result.create(cells);

} else{

否则,没有设置codec。直接从response中读取出来数据,

//Result is pure pb.

results[i]= ProtobufUtil.toResult(response.getResults(i));

}

}

returnresults;

}


ClientScanner.next方法中,假设还没有达到scancaching的值,(默觉得1)也就是countdown的值还不等于0

,countdown的值为得到一个Result时减1,通过nextScanner又一次得到下一个region,并发起连接去scan数据。


Do{

.........................此处省去一些代码。

if(values !=null&& values.length> 0) {

for(Result rs: values) {

cache.add(rs);

for(Cellkv :rs.rawCells()){

//TODOmake method in Cell or CellUtil

remainingResultSize-= KeyValueUtil.ensureKeyValue(kv).heapSize();

}

countdown--;

this.lastResult= rs;

}

}

}while(remainingResultSize> 0 && countdown> 0 && nextScanner(countdown,values ==null));


对于这样的类型的查询操作。能够使用得到一个ClientScanner后,不运行close操作。

rstimeout前每次定期去从rs中拿一定量的数据下来。

缓存到ClientScannercache中。

每次next时从cache中直接拿数据


Hregion.RegionScannerImpl.nextRaw(list)方法分析

RegionScannerImpl是对RegionScanner接口的实现。

Rsscan在运行时通过regionScanner.nextRaw(list)来获取数据。

通过regionScanner.isFilterDone来检查此region的查找是否完毕。


调用nextRaw方法,此方法调用还有一个重载方法,batchscan中设置的每次可查询最大的单行中的多少个kvkv个数

publicbooleannextRaw(List<Cell>outResults)

throwsIOException {

returnnextRaw(outResults,batch);

}


publicbooleannextRaw(List<Cell>outResults,intlimit)throwsIOException {

booleanreturnResult;

调用nextInternal方法。

if(outResults.isEmpty()){

//Usually outResults is empty. This is true when next is called

//to handle scan or get operation.

returnResult= nextInternal(outResults,limit);

} else{

List<Cell>tmpList =newArrayList<Cell>();

returnResult= nextInternal(tmpList,limit);

outResults.addAll(tmpList);

}

调用filter.reset方法,清空当前rowfilter的相关信息。

ResetFilters();

假设filter.filterAllRemaining()的返回值为true,时表示当前region的查找条件已经结束,不能在运行查找操作。

没有能够接着查找的须要,也就是没有很多其它要查找的行了。

if(isFilterDone()){

returnfalse;

}

................................此处省去一些代码

returnreturnResult;

}


nextInternal方法处理流程:

privatebooleannextInternal(List<Cell>results,intlimit)

throwsIOException {

if(!results.isEmpty()){

thrownewIllegalArgumentException("Firstparameter should be an empty list");

}

RpcCallContextrpcCall =RpcServer.getCurrentCall();

//The loop here is used only when at some point during the next wedetermine

//that due to effects of filters or otherwise, we have an empty row inthe result.

//Then we loop and try again. Otherwise, we must get out on the firstiteration via return,

//"true" if there's more data to read, "false" ifthere isn't (storeHeap is at a stop row,

//and joinedHeap has no more data to read for the last row (if set,joinedContinuationRow).

while(true){

if(rpcCall !=null){

//If a user specifies a too-restrictive or too-slow scanner, the

//client might time out and disconnect while the server side

//is still processing the request. We should abort aggressively

//in that case.

longafterTime =rpcCall.disconnectSince();

if(afterTime>= 0) {

thrownewCallerDisconnectedException(

"Abortingon region " +getRegionNameAsString()+ ", call "+

this+ " after "+ afterTime+ " ms, since "+

"callerdisconnected");

}

}

得到通过startkeyseek后当前最小的一个kv

//Let's see what we have in the storeHeap.

KeyValue current= this.storeHeap.peek();


byte[]currentRow= null;

intoffset = 0;

shortlength = 0;

if(current !=null){

currentRow= current.getBuffer();

offset= current.getRowOffset();

length= current.getRowLength();

}

检查是否到了stopkey,假设是。返回false,joinedContinuationRow是多个cf的关联查找。不用去管它

booleanstopRow =isStopRow(currentRow,offset,length);

//Check if we were getting data from the joinedHeap and hit the limit.

//If not, then it's main path - getting results from storeHeap.

if(joinedContinuationRow== null){

//First, check if we are at a stop row. If so, there are no moreresults.

if(stopRow) {

假设是stopRow,同一时候filter.hasFilterRow返回为true时。

可通过filterRowCells来检查要返回的kvlist,也能够用来改动要返回的kvlist

if(filter !=null&& filter.hasFilterRow()){

filter.filterRowCells(results);

}

returnfalse;

}

通过filter.filterRowkey来过滤检查key是否须要排除,假设是排除返回true,否则返回false

//Check if rowkey filter wants to exclude this row. If so, loop tonext.

//Technically, if we hit limits before on this row, we don't need thiscall.

if(filterRowKey(currentRow,offset,length)) {

假设rowkey是须要排除的rowkey,检查是否有下一行数据。

假设没有下一行数据。返回flase,表示当前region查找结束

否则清空当前的results,又一次进行查找

booleanmoreRows =nextRow(currentRow,offset,length);

if(!moreRows)returnfalse;

results.clear();

continue;

}

開始运行region下此scan须要的全部storeStoreScannernext进行查找,把查找的结果放到results列表中。

假设一行中包括有多个kv,如今查找这些kv达到传入的limit的大小的时候,返回kv_limit的一个空的kv

(查找的大小已经达到limit(batch)的一行最大scankv个数,返回kv_limit),

否则表示还没有查找到limitkv个数,可是当前row相应的全部达到条件的kv都已经查找完毕,返回最后一个kv

返回的kv假设不是kv_limit,那么有可能是null或者是下一行的第一个kv.

KeyValue nextKv= populateResult(results,this.storeHeap,limit,currentRow,offset,

length);

假设达到limit的限制时,filter.hasFilterRow的值一定得是false,

否则会throw IncompatibleFilterException

假设达到limit的限制时。返回true,当前row的全部kv查找结束,返回true能够接着向下查找

提示:假设hbase一行数据中可能包括多个kv时,最好是在scan时设置batch的属性,否则会一直查找到全部的kv结束

//Ok, we are good, let's try to get some results from the main heap.

if(nextKv ==KV_LIMIT) {

if(this.filter!= null&& filter.hasFilterRow()){

thrownewIncompatibleFilterException(

"Filterwhose hasFilterRow() returns true is incompatible with scan withlimit!");

}

returntrue;// We hit the limit.

}

是否到结束行,从这一行代码中能够看出,stoprow是不包括的。由于nextKv肯定是下一行row中第一个kv的值

stopRow= nextKv ==null||

isStopRow(nextKv.getBuffer(),nextKv.getRowOffset(),nextKv.getRowLength());

//save that the row was empty before filters applied to it.

finalbooleanisEmptyRow= results.isEmpty();


假设是stopRow,同一时候filter.hasFilterRow返回为true时,

可通过filterRowCells来检查要返回的kvlist,也能够用来改动要返回的kvlist

//We have the part of the row necessary for filtering (all of it,usually).

//First filter with the filterRow(List).

if(filter !=null&& filter.hasFilterRow()){

filter.filterRowCells(results);

}

假设当前row的查找没有找到合法的kv,也就是results的列表没有值,检查是否还有下一行,

假设有,又一次进行查找。否则表示当前region的查找最结尾处,不能再进行查找,返回fasle

if(isEmptyRow){

booleanmoreRows =nextRow(currentRow,offset,length);

if(!moreRows)returnfalse;

results.clear();

//This row was totally filtered out, if this is NOT the last row,

//we should continue on. Otherwise, nothing else to do.

if(!stopRow)continue;

returnfalse;

}


//Ok, we are done with storeHeap for this row.

//Now we may need to fetch additional, non-essential data into row.

//These values are not needed for filter to work, so we postpone their

//fetch to (possibly) reduce amount of data loads from disk.

if(this.joinedHeap!= null){

..................................进行关联查找的代码,不显示。也不分析

}

} else{

多个store进行关联查询,不分析,通常情况不会有

//Populating from the joined heap was stopped by limits, populate somemore.

populateFromJoinedHeap(results,limit);

}


//We may have just called populateFromJoinedMap and hit the limits. Ifthat is

//the case, we need to call it again on the next next() invocation.

if(joinedContinuationRow!= null){

returntrue;

}

假设这次的查找,results的结果为空。表示没有查找到结果。检查是否还有下一行数据,假设有又一次进行查找,

否则返回false表示此region的查找结束

//Finally, we are done with both joinedHeap and storeHeap.

//Double check to prevent empty rows from appearing in result. It couldbe

//the case when SingleColumnValueExcludeFilter is used.

if(results.isEmpty()){

booleanmoreRows =nextRow(currentRow,offset,length);

if(!moreRows)returnfalse;

if(!stopRow)continue;

}

stoprow时。表示还能够有下一行的数据,也就是能够接着进行next操作。否则表示此region的查找结束

//We are done. Return the result.

return!stopRow;

}

}


UserScan时的ScanQueryMatcher.match方法处理

userscan时的ScanQueryMatchernewRegionScannerImpl(scan,additionalScanners,this);时生成。

在生成StoreScanner时通过例如以下代码生成matcher实例。


matcher= newScanQueryMatcher(scan,scanInfo,columns,

ScanType.USER_SCAN,Long.MAX_VALUE,HConstants.LATEST_TIMESTAMP,

oldestUnexpiredTS);


matcher.isUserScan的值此时为true.


publicMatchCodematch(KeyValuekv) throwsIOException {

检查当前region的查找是否结束。pageFilter就是通过控制此filter中的方法来检查是否须要

if(filter !=null&& filter.filterAllRemaining()){

returnMatchCode.DONE_SCAN;

}


byte[] bytes =kv.getBuffer();

intoffset =kv.getOffset();


intkeyLength =Bytes.toInt(bytes,offset,Bytes.SIZEOF_INT);

offset+= KeyValue.ROW_OFFSET;


intinitialOffset= offset;


shortrowLength =Bytes.toShort(bytes,offset,Bytes.SIZEOF_SHORT);

offset+= Bytes.SIZEOF_SHORT;

检查传入的kv是否是当前行的kv。也就是rowkey是否同样,假设当前的rowkey小于传入的rowkey

表示如今已经next到下一行了。返回DONE,表示当前行查找结束

intret =this.rowComparator.compareRows(row,this.rowOffset,this.rowLength,

bytes,offset,rowLength);

if(ret <=-1) {

returnMatchCode.DONE;

}elseif(ret >=1) {

假设当前的rowkey大于传入的rowkey。表示当前next出来的kv比方今的kv要小,运行nextrow操作。

//could optimize this, if necessary?

//Could also be called SEEK_TO_CURRENT_ROW, but this

//should be rare/never happens.

returnMatchCode.SEEK_NEXT_ROW;

}

是否跳过当前行的其他kv比較。这是一个优化项。

//optimize case.

if(this.stickyNextRow)

returnMatchCode.SEEK_NEXT_ROW;

假设当前行的全部要查找的(scan)column都查找完毕了。其他的当前行中非要scankv

直接不比較,运行nextrow操作。

if(this.columns.done()){

stickyNextRow= true;

returnMatchCode.SEEK_NEXT_ROW;

}


//PassingrowLength

offset+= rowLength;


//Skippingfamily

bytefamilyLength= bytes[offset];

offset+= familyLength+ 1;


intqualLength= keyLength-

(offset- initialOffset)- KeyValue.TIMESTAMP_TYPE_SIZE;

检查当前KVTTL是否过期,假设过期,检查是否SCAN中还有下一个COLUMN,假设有返回SEEK_NEXT_COL

否则返回SEEK_NEXT_ROW

longtimestamp =Bytes.toLong(bytes,initialOffset+ keyLength- KeyValue.TIMESTAMP_TYPE_SIZE);

//check for early out based on timestampalone

if(columns.isDone(timestamp)){

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

}


/*

*The delete logic is pretty complicated now.

*This is corroborated by the following:

*1. The store might be instructed to keep deleted rows around.

*2. A scan can optionally see past a delete marker now.

*3. If deleted rows are kept, we have to find out when we can

* remove the delete markers.

*4. Family delete markers are always first (regardless of their TS)

*5. Delete markers should not be counted as version

*6. Delete markers affect puts of the *same* TS

*7. Delete marker need to be version counted together with puts

* they affect

*/

bytetype =bytes[initialOffset+ keyLength– 1];

假设当前KV是删除的KV

if(kv.isDelete()){

此处会进入。把删除的KV加入到DeleteTracker中,默认是ScanDeleteTracker

if(!keepDeletedCells){

//first ignore delete markers if the scanner can do so, and the

//range does not include the marker

//

//during flushes and compactionsalso ignore delete markers newer

//than the readpointof any open scanner, this prevents deleted

//rows that could still be seen by a scanner from being collected

booleanincludeDeleteMarker= seePastDeleteMarkers?

tr.withinTimeRange(timestamp):

tr.withinOrAfterTimeRange(timestamp);

if(includeDeleteMarker

&&kv.getMvccVersion()<= maxReadPointToTrackVersions){

this.deletes.add(bytes,offset,qualLength,timestamp,type);

}

//Can't early out now, because DelFam come before any other keys

}

此处的检查不会进入,userscan不保留删除的数据

if(retainDeletesInOutput

|| (!isUserScan&& (EnvironmentEdgeManager.currentTimeMillis()- timestamp)<= timeToPurgeDeletes)

|| kv.getMvccVersion()> maxReadPointToTrackVersions){

//always include or it is not time yet to check whether it is OK

//to purge deltesor not

if(!isUserScan){

//if this is not a user scan (compaction), we can filter thisdeletemarkerright here

//otherwise (i.e. a "raw" scan) we fall through to normalversion and timerangechecking

returnMatchCode.INCLUDE;

}

} elseif(keepDeletedCells){

if(timestamp< earliestPutTs){

//keeping delete rows, but there are no puts older than

//this delete in the store files.

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

}

//else: fall through and do version counting on the

//delete markers

} else{

returnMatchCode.SKIP;

}

//note the following next else if...

//delete marker are not subject to other delete markers

}elseif(!this.deletes.isEmpty()){

假设deleteTracker中不为空时,也就是当前行中有删除的KV,检查当前KV是否是删除的KV

提示:删除的KVcompare时,比正常的KV要小,所以在运行next操作时,deleteKV会先被查找出来。

假设是删除的KV,依据KV的删除类型。假设是版本号被删除,返回SKIP

否则假设SCAN中还有下一个要SCANcolumn时。返回SEEK_NEXT_COL

否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

DeleteResultdeleteResult= deletes.isDeleted(bytes,offset,qualLength,

timestamp);

switch(deleteResult){

caseFAMILY_DELETED:

caseCOLUMN_DELETED:

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

caseVERSION_DELETED:

caseFAMILY_VERSION_DELETED:

returnMatchCode.SKIP;

caseNOT_DELETED:

break;

default:

thrownewRuntimeException("UNEXPECTED");

}

}

检查KV的时间是否在SCAN要查找的时间范围内,

inttimestampComparison= tr.compare(timestamp);

假设大于SCAN的最大时间。返回SKIP

if(timestampComparison>= 1) {

returnMatchCode.SKIP;

}elseif(timestampComparison<= -1) {

假设小于SCAN的最小时间,假设SCAN中还有下一个要SCANcolumn时。返回SEEK_NEXT_COL

否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

}

检查当前KVcolumn是否是SCAN中指定的column列表中包括的值,假设是INCLUDE

否则假设SCAN中还有下一个要SCANcolumn时,返回SEEK_NEXT_COL

否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

//STEP 1: Check if the column is part of the requested columns

MatchCodecolChecker= columns.checkColumn(bytes,offset,qualLength,type);

假设columnSCAN中要查找的column之中的一个

if(colChecker== MatchCode.INCLUDE){

ReturnCodefilterResponse= ReturnCode.SKIP;

//STEP 2: Yes, the column is part of the requested columns. Check iffilter is present

if(filter !=null){

运行filter.filterKeyValue操作。并返回filter过滤的结果

//STEP 3: Filter the key value and return if it filters out

filterResponse= filter.filterKeyValue(kv);

switch(filterResponse){

caseSKIP:

returnMatchCode.SKIP;

caseNEXT_COL:

假设SCAN中还有下一个要SCANcolumn时。返回SEEK_NEXT_COL

否则表示当前行没有须要在进行查找的KV,返回SEEK_NEXT_ROW

returncolumns.getNextRowOrNextColumn(bytes,offset,qualLength);

caseNEXT_ROW:

stickyNextRow= true;

returnMatchCode.SEEK_NEXT_ROW;

caseSEEK_NEXT_USING_HINT:

returnMatchCode.SEEK_NEXT_USING_HINT;

default:

//Itmeans it is either include or include and seek next

break;

}

}

/*

* STEP 4: Reaching this stepmeans the column is part of the requested columns and either

* the filter is null or thefilter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.

* Now check the number ofversions needed. This method call returns SKIP, INCLUDE,

* INCLUDE_AND_SEEK_NEXT_ROW,INCLUDE_AND_SEEK_NEXT_COL.

*

* FilterResponse ColumnChecker Desired behavior

* INCLUDE SKIP row has already been included, SKIP.

* INCLUDE INCLUDE INCLUDE

* INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

* INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

* INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.

* INCLUDE_AND_SEEK_NEXT_COLINCLUDE INCLUDE_AND_SEEK_NEXT_COL

* INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL

* INCLUDE_AND_SEEK_NEXT_COLINCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW

*

* In all the above scenarios, wereturn the column checker return value except for

* FilterResponse(INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)

*/


此处主要是检查KV的是否是SCAN的最大版本号内,到这个地方。除非是KV超过了要SCAN的最大版本号,或者KVTTL过期。

否则肯定是会包括此KV的值。


colChecker=

columns.checkVersions(bytes,offset,qualLength,timestamp,type,

kv.getMvccVersion()> maxReadPointToTrackVersions);

//Optimizewith stickyNextRow

stickyNextRow= colChecker== MatchCode.INCLUDE_AND_SEEK_NEXT_ROW? true: stickyNextRow;

return(filterResponse== ReturnCode.INCLUDE_AND_NEXT_COL&&

colChecker== MatchCode.INCLUDE)? MatchCode.INCLUDE_AND_SEEK_NEXT_COL

: colChecker;

}

stickyNextRow= (colChecker== MatchCode.SEEK_NEXT_ROW)?

true

: stickyNextRow;

returncolChecker;

}


原文地址:https://www.cnblogs.com/liguangsunls/p/6779956.html