Mongodb总结5-通过装饰模式,用Mongodb解决Hbase的不稳定问题

最近继续学习Mongodb的根本原因,是为了解决今天的问题。
项目中用到了Hbase,生产环境服务器用了3台,但是不够稳定,每2天左右,就连不上了。
重启就好了,当然,这是一个历史遗留问题。
我在想,是不是连接没有关闭,每次都是建立新的连接?
瞅瞅Java访问Hbase的代码,都close了额。


原来的Hbase,用Java访问,有add/update、get、getList3个接口。
现在要加上Mongodb存储,尽可能保证Hbase和Mongodb数据同步。
优先使用Mongodb中的数据,其次才使用HBase中的数据。


今后有可能不再使用Hbase。


在项目刚刚启动的时候,需要同步HBase中的数据到Mongodb。
简化代码如下
public class ProjectDetailClient {
private ProjectDetailHbaseClient hbase = new ProjectDetailHbaseClient();
private ProjectDetailMongodbClient mongodb = new ProjectDetailMongodbClient();

// 2个都增加
public void add(ProjectDetail projectDetail) {
}


可以这么理解,原来直接使用ProjectDetailHbaseClient,方法名称都一样。
后台增加了ProjectDetailMongodbClient,方法的实现也一样,可以看作是一套接口的2套实现。
ProjectDetailClient的add等具体方法中,会处理2个接口的调用、数据同步等逻辑问题。


完整代码如下

package com.hanhai.zrb.api.mongodb;

import java.util.List;

import org.apache.log4j.Logger;

import casia.isiteam.zrb.hbase.client.ProjectDetailHbaseClient;

import com.hanhai.zrb.model.project.ProjectDetail;

public class ProjectDetailClient {
	private ProjectDetailHbaseClient hbase = new ProjectDetailHbaseClient();
	private ProjectDetailMongodbClient mongodb = new ProjectDetailMongodbClient();
	private Logger log = Logger.getLogger(getClass());

	// 2个都增加
	public void add(ProjectDetail projectDetail) {
		log.info("Add ProjectDetail for hbase.");
		hbase.insertProjectDetail(projectDetail);
		log.info("Add ProjectDetail for mongodb.");
		mongodb.add(projectDetail);
	}

	// 2个都更新
	public void update(ProjectDetail projectDetail) {
		if (projectDetail.getId() == null) {
			log.error("ProjectDetail is is null,Cantnot update~");
			return;
		}
		Long id = projectDetail.getId();
	
		ProjectDetail one = mongodb.get(id);
		// Mongodb,如果存在,更新
		if (one != null) {
			log.info("Update ProjectDetail,Mongodb exists,id="+id);
			mongodb.update(projectDetail);
		}
		// 不存在,就增加
		else {
			log.info("Update ProjectDetail,Mongodb not exists,id="+id);
			mongodb.add(projectDetail);
		}
		
		// hbase增加和更新是同一个接口
		log.info("Update ProjectDetail for hbase,id="+id);
		hbase.insertProjectDetail(projectDetail);
	}

	// 2个都查询,优先使用Mongodb
	public ProjectDetail get(long id) {
		ProjectDetail one = null;
		ProjectDetail hbaseOne = hbase.getProjectDetail(id);
		ProjectDetail mongodbOne = mongodb.get(id);
		if (mongodbOne != null) {
			one = mongodbOne;
			log.info("Project Detail,Mongodb exists,Use Mongodb," + one);
		} else if (hbaseOne != null) {
			one = hbaseOne;
			log.info("Project Detail,Mongodb not exists,Use Hbase," + one);
			log.info("Add Project Detail To Mongodb");
			// 同步Hbase中的数据到Mongodb
			mongodb.add(hbaseOne);
		}
		return one;
	}

	// 2个都查询,优先使用Mongodb
	public List<ProjectDetail> getProjectInfoBasic(List<Long> idList) {
		List<ProjectDetail> list = null;
		List<ProjectDetail> hbaseList = hbase.getProjectInfoBasic(idList);
		List<ProjectDetail> mongodbList = mongodb.getProjectInfoBasic(idList);
		// 优先使用Mongodb中的,条件,Mongodb中的个数不小于hbase中的
		if (mongodbList != null) {
			int size = mongodbList.size();
			if (hbaseList == null || hbaseList.size() <= size) {
				list = mongodbList;
				log.info("ProjectDetail list,Use MongodbList,size=" + size);
			}else{
				list = hbaseList;
				log.info("ProjectDetail list,Use HbaseList,size=" + hbaseList.size()+",mongodb count "+size+" < hbase count "+hbaseList.size());
			}
		}
		// 其次使用Hbase中的,不会同步hbase中的数据到Mongodb
		else if (hbaseList != null) {
			list = hbaseList;
			log.info("ProjectDetail list,Use HbaseList,size=" + hbaseList.size());
		}
		return list;
	}
}

package com.hanhai.zrb.api.mongodb;

import java.util.ArrayList;
import java.util.List;

import org.apache.log4j.Logger;

import com.hanhai.zrb.model.project.ProjectDetail;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.WriteResult;

public class ProjectDetailMongodbClient {

	public static final String CON_NAME = "projectDetail";

	private Logger log = Logger.getLogger(getClass());

	public void add(ProjectDetail projectDetail) {
		DBCollection con = getCon();
		add(con, projectDetail);
	}

	private DBCollection getCon() {
		DB db = MongoUtil.db();
		DBCollection con = db.getCollection(CON_NAME);
		return con;
	}

	// 增加
	private DBCollection add(DBCollection projectDetailCollection,
			ProjectDetail projectDetail) {
		DBObject object = BeanUtil.bean2DBObject(projectDetail);
		WriteResult wr = projectDetailCollection.insert(object);
		CommandResult cr = wr.getLastError();
		log.info("Add new projectDetail,result:" + cr);
		return projectDetailCollection;
	}

	public void update(ProjectDetail projectDetail) {
		DBCollection con = getCon();
		update(con, projectDetail);
	}

	// 修改
	private void update(DBCollection collection, ProjectDetail projectDetail) {
		if (projectDetail.getId() == null) {
			log.error("Update projectDetail,must have a unique id");
			return;
		}

		BasicDBObject updateCondition = new BasicDBObject();
		updateCondition.append("id", projectDetail.getId());

		DBObject newObject = BeanUtil.bean2DBObject(projectDetail);

		DBObject updateSetValue = new BasicDBObject("$set", newObject);
		WriteResult wr = collection.update(updateCondition, updateSetValue);
		log.info("Update new projectDetail,result:" + wr);
	}

	public ProjectDetail get(long id) {
		DBCollection con = getCon();
		ProjectDetail projectDetail = findById(con, id);
		return projectDetail;
	}

	// 从集合中,根据ID查找
	private ProjectDetail findById(DBCollection collection, Long id) {
		BasicDBObject searchProjectDetailById = new BasicDBObject();
		searchProjectDetailById.append("id", id);
		ProjectDetail projectDetailBefore = null;
		// findOne方法更简单一些
		DBCursor cursor = collection.find(searchProjectDetailById);
		while (cursor.hasNext()) {
			DBObject articleObject = cursor.next();
			if (articleObject != null) {
				projectDetailBefore = objectToArticle(articleObject);
				String internalId = articleObject.get("_id").toString();
				projectDetailBefore.setMongoId(internalId);
			}
		}
		cursor.close();
		return projectDetailBefore;
	}

	// 对象转换
	private ProjectDetail objectToArticle(DBObject object) {
		ProjectDetail projectDetail = new ProjectDetail();
		// 用工具方法转换,手动转换,需要判断类型,比较麻烦
		projectDetail = BeanUtil.dbObject2Bean(object, projectDetail);
		return projectDetail;
	}

	public List<ProjectDetail> getProjectInfoBasic(List<Long> idList) {
		DBCollection con = getCon();
		List<ProjectDetail> list = findByIdList(con, idList);
		return list;
	}

	// 根据ID集合查找
	private List<ProjectDetail> findByIdList(DBCollection collection,
			List<Long> idList) {
		BasicDBList values = new BasicDBList();
		values.addAll(idList);

		DBObject inQuery = new BasicDBObject("$in", values);

		DBObject con = new BasicDBObject();
		con.put("id", inQuery);
		DBCursor cursorIdArray = collection.find(con);

		List<ProjectDetail> projectDetailList = new ArrayList<ProjectDetail>();
		while (cursorIdArray.hasNext()) {
			DBObject articleObject = cursorIdArray.next();
			ProjectDetail projectDetail = new ProjectDetail();
			BeanUtil.dbObject2Bean(articleObject, projectDetail);
			String mongoId = articleObject.get("_id").toString();
			projectDetail.setMongoId(mongoId);

			projectDetailList.add(projectDetail);
		}
		return projectDetailList;
	}

}



ProjectDetailHbaseClient代码较为复杂,和ProjectDetailMongodbClient类似,不再贴了。

原文地址:https://www.cnblogs.com/qitian1/p/6462691.html