Scala与Mongodb实践4-----数据库操具体应用

目的:在实践3中搭建了运算环境,这里学会如何使用该环境进行具体的运算和相关的排序组合等。

  • 由数据库mongodb操作如find,aggregate等可知它们的返回类型是FindObservable、AggregateObservable等,可看作每个数据库的操作都是Observable,
  • 因为之sort和projection等相应的操作都是在API中的FondObsevable里面的,且返回FondObsevable。
/**
   * Sets a document describing the fields to return for all matching documents.
   *
   * [[http://docs.mongodb.org/manual/reference/method/db.collection.find/ Projection]]
   * @param projection the project document, which may be null.
   * @return this
   */
  def projection(projection: Bson): FindObservable[TResult] = {
    wrapped.projection(projection)
    this
  }

  /**
   * Sets the sort criteria to apply to the query.
   *
   * [[http://docs.mongodb.org/manual/reference/method/cursor.sort/ Sort]]
   * @param sort the sort criteria, which may be null.
   * @return this
   */
  def sort(sort: Bson): FindObservable[TResult] = {
    wrapped.sort(sort)
    this
  }
  •  可以引入新的类型,能够便于使用
 type MGOFilterResult =  FindObservable[Document] => FindObservable[Document]

 一、简单查询操作:排序、投影

 1、排序

  • 引入包
    import org.mongodb.scala.model.Sorts._
  • 在Sorts里面能看到里面的是具体方法
 /**
   * Create a sort specification for an ascending sort on the given fields.
   *
   * @param fieldNames the field names, which must contain at least one
   * @return the sort specification
   * @see [[http://http://docs.mongodb.org/manual/reference/operator/meta/orderby Sort]]
   */
  def ascending(fieldNames: String*): Bson = JSorts.ascending(fieldNames.asJava)

  /**
   * Create a sort specification for an ascending sort on the given fields.
   *
   * @param fieldNames the field names, which must contain at least one
   * @return the sort specification
   * @see [[http://http://docs.mongodb.org/manual/reference/operator/meta/orderby Sort]]
   */
  def descending(fieldNames: String*): Bson = JSorts.descending(fieldNames.asJava)

  /**
   * Create a sort specification for the text score meta projection on the given field.
   *
   * @param fieldName the field name
   * @return the sort specification
   * @see [[http://http://docs.mongodb.org/manual/reference/operator/projection/meta/#sort textScore]]
   */
  def metaTextScore(fieldName: String): Bson = JSorts.metaTextScore(fieldName)

  /**
   * Combine multiple sort specifications.  If any field names are repeated, the last one takes precendence.
   *
   * @param sorts the sort specifications
   * @return the combined sort specification
   */
  def orderBy(sorts: Bson*): Bson = JSorts.orderBy(sorts.asJava)
  • 应用如下:
  import org.mongodb.scala.model.Projections._
  import org.mongodb.scala.model.Sorts._
    val sort:MGOFilterResult=a=>a.sort(descending("age"))//“age”降序
    val F1:Future[List[Document]] = DAO(dxt.setCommand(Find(andThen = Some(sort))))

   F1.onComplete {
      case Success(docs:List[Document]) => 
            docs.foreach(Person().fromDocument(_).toSink())
            println("-------------------------------")
      case Failure(err) => println(s"显示错误:${err.getMessage}")
     }

2、投影

  • 选择需要的字段(首页只出现商品的图片,价钱、不全部出现相关全部信息。。。)
  • 从数据库内获取字段,其他的不显示出来,这里需要对Model里面的toSink进行一些修改,使它在没有获取对应数据时,不在控制台那里显示
  def toSink()(implicit mat: ActorMaterializer) = {
      println("---------------------------------------")
      if (ctx.lastName!=null) {
        println(s"LastName: ${ctx.lastName}")}
      if (ctx.firstName!=null) {
        println(s"firstName: ${ctx.firstName}")}
      if (ctx.age!=0) {               //这里是不能设置为0的!!!其他的是空的就不理它
      println(s"age: ${ctx.age}")}
      if (!ctx.phones.isEmpty) {
      println(s"phones: ${ctx.phones}")}
      if (!ctx.address.isEmpty) {
      println(s"address: ${ctx.address}")}
      if (!ctx.picture.isEmpty) {
        val path = s"/img/${ctx.firstName}.jpg"
        FileStreaming.ByteArrayToFile(ctx.picture, path)
        println(s"picture saved to: ${path}")
      }
    }
  • 应用如下
    import org.mongodb.scala.model.Projections._
    val proj: MGOFilterResult = a => a.projection(and(include("firstName"),excludeId()))
    val F2:Future[List[Document]] = DAO(dxt.setCommand(Find(andThen = Some(proj))))

    F2.onComplete {
      case Success(docs:List[Document]) => docs.foreach(Person().fromDocument(_).toSink())
        println("-------------------------------")
      case Failure(err) => println(s"显示错误:${err.getMessage}")
    }

 二、数据库操作的使用手册

  • 在com.mongodb.client.model包中包含CountOptions等Options,类内有各种选项。原本在casbah包内的,casbah【 end-of-life (EOL)】,迁移mongo-scala-driver。在下列的包中:

1、Count

//mongodb命令:db.personal.count({"age":{$gt:18}}) ==》结果:1
 import  org.mongodb.scala.model.Filters._
 import com.mongodb.client.model.CountOptions._
  val countOptions:CountOptions=new CountOptions().skip(1)
  val q=dxt.setCommand(Count(Some(gt("age",18)),Some(countOptions)))
  DAO(q).foreach(println)//println:0

2、Distict

 

3、Find

//db.personal.findOne()
val p=new Person()
val ctxFindFirst :Future[Person]=DAO(dxt.setCommand(Find(converter=Some(p.fromDocument _),firstOnly=true)))
ctxFindFirst.andThen {
        case Success(doc) =>doc.toSink()
          //foreach(Person().fromDocument(_).toSink())
          println("-------------------------------")
        case Failure(err) => println(s"显示错误:${err.getMessage}")
      }
===---------------------------------------
LastName: Wang
firstName: Susan
age: 18
phones: List(BsonString{value='137110998'}, BsonString{value='189343661'})
address: List(Address(Sz,101992), Address(gz,231445))
picture saved to: /img/Susan.jpg
-------------------------------

4、Aggregate

//db.mycol.aggregate([{$group : {_id : "$lastName", num_tutorial : {$sum : 1}}}])
 import org.mongodb.scala.model.Aggregates._

      val a:Bson=group("$lastName", Accumulators.sum("num_tutorial", 1))
      val q:Future[List[Person]]=DAO(dxt.setCommand(Aggregate(Seq(a))))
      q.onComplete{
         case Success(value)=>value.foreach(println)
       }

注意:从org.mongodb.scala的API中可得到相关的信息,它们可以返回Bson的,但是也可以在FindObservable中看到,但是此时它们返回的类型不同。

5、MapReduce

//db.personal.mapReduce( 
   function() { emit(this.lastName,1); }, 
   function(key, values) {return Array.sum(values)}, 
      {  
         query:{firstName:"j"},  
         out:"total" 
      }
)

 

6、Insert

 //db.collection.insertOne()
 // db.collection.insertMany()

 val p1=Person("","",12).toDocument
 val p2=Person("","",23).toDocument
 val p=DAO(dxt.setCommand(Insert(Seq(p1,p2))))

 

7、Delete

//db.personal.remove({"age":23},1)
val p = DAO(dxt.setCommand(Delete(equal("age",23),None,true)))
  p.onComplete{
    case Success(value)=>println("OK")
  }

8、Update

 //db.personal.update({"lastName":"Wang"},{$set:{"age":18}})
 org.mongodb.scala.model.Updates._
  val D :Future[List[Person]]=DAO(dxt.setCommand(Update(equal("age",18),set("age","12"),None,true)))

  D.onComplete {
    case Success(docs) =>println("OK")
      println("-------------------------------")
    case Failure(err) => println(s"显示错误:${err.getMessage}")
  }

 

9、BulkWrite

 

10、DropCollection

//db.dbs.drop()
val q=DAO(dxt.setCommand(DropCollection("dbs")))
 q.onComplete{
   case Success(value)=>print("成功!")
 }

11、CreateCollection

//db.createCollection("dbs")
val q=DAO(dxt.setCommand(CreateCollection("dbs")))
//还有options的选项,如count那里一样添加就好了
       q.onComplete{
         case Success(value)=>print("成功!")
       }

12、ListCollection

//show tables
val q:Future[List[Document]]=DAO(dxt.setCommand(ListCollection("mydb")))
        q.foreach(println)//不过出来的是比较全面信息,什么type、index都有

13、CreateView

//db.createView("View1","personal",[{$group : {_id : "$lastName", num_tutorial : {$sum : 1}}}])
import org.mongodb.scala.model.Aggregates._

  val a:Bson=group("$lastName", Accumulators.sum("num_tutorial", 1))
  val q:Future[List[Person]]=DAO(dxt.setCommand(CreateView("View1","personal",Seq(a))))
  q.onComplete{
    case Success(value)=>println("OK")
  }

14、CreateIndex

//db.personal.createIndex({"age":-1},{background: true})?
  import  org.mongodb.scala.model.Indexes._
  import com.mongodb.client.model.IndexOptions._


  val countOptions:IndexOptions=new IndexOptions().background(true)
  val q=dxt.setCommand(CreateIndex(descending("age"),Some(IndexOptions)))//注意升降
  DAO(q).onComplete{
    case Success(v)=>println("成功")
  }

15、DropIndexByName

//db.personal.dropIndex("date_1")
val q=dxt.setCommand(DropIndexByName("date_1"))
  DAO(q).onComplete{
    case Success(v)=>println("成功")
  }

16、DropIndexByKey

//主要是升降ascending、descending
 val q=dxt.setCommand(DropIndexByKey(ascending("age")))
  DAO(q).onComplete{
    case Success(v)=>println("成功")
  }

17、DropAllIndexes

  val q:Future[List[Person]]=DAO(dxt.setCommand(DropAllIndexes()))
  q.onComplete{
    case Success(value)=>println("OK")
  }






原文地址:https://www.cnblogs.com/0205gt/p/11206088.html