Go 外部排序-网络版

目录结果

main.go

package main

import (
    "NetworkSort/pipeline"
    "fmt"
    "os"
    "bufio"
)

func main() {
    const filename = "large.in"
    const n = 100000000
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    p := pipeline.RandomSource(n)
    writer := bufio.NewWriter(file)
    pipeline.WriterSink(writer, p)
    writer.Flush()

    file, err = os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    p = pipeline.ReaderSource(bufio.NewReader(file), -1)
    count := 0
    for v := range p {
        fmt.Println(v)
        count ++
        if count >= 100 {
            break
        }
    }
}

sort.go

package main

import (
    "os"
    "NetworkSort/pipeline"
    "bufio"
    "fmt"
    "strconv"
)

func main() {
    p := createNetworkPipeline("small.in", 512, 4)
    writeToFile(p, "small.out")
    printFile("small.out")
}

func printFile(filename string) {
    file, err := os.Open(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    p := pipeline.ReaderSource(file, -1)
    count := 0
    for v:= range p {
        fmt.Println(v)
        count ++
        if count >= 100 {
            break
        }
    }
}

func writeToFile(p <-chan int, filename string) {
    file, err := os.Create(filename)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    writer := bufio.NewWriter(file)
    defer writer.Flush()
    pipeline.WriterSink(writer, p)

}

func createNetworkPipeline(filename string, fileSize, chunkCount int) <-chan int {
    chunkSize := fileSize / chunkCount
    pipeline.Init()
    var sortAddr []string
    for i := 0; i < chunkCount; i ++ {
        file, err := os.Open(filename)
        if err != nil {
            panic(err)
        }
        file.Seek(int64(i * chunkSize), 0)
        source := pipeline.ReaderSource(bufio.NewReader(file), chunkSize)
        addr := "127.0.0.1:" + strconv.Itoa(7000 + i)
        fmt.Println("addr:", addr)
        pipeline.NetworkSink(addr, pipeline.InMemorySort(source))
        sortAddr = append(sortAddr, addr)
    }
    return nil
    var sortResults []<-chan int
    for _, addr := range sortAddr {
        sortResults = append(sortResults, pipeline.NetworkSource(addr))
    }
    return pipeline.MergeN(sortResults ...)
}

nodes.go

package pipeline

import (
    "sort"
    "io"
    "encoding/binary"
    "math/rand"
    "time"
    "fmt"
)

var startTime time.Time

func Init() {
    startTime = time.Now()
}

func ArraySource(a ...int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        for _, v := range a{
            out <- v
        }
        close(out)
    }()
    return out
}

func InMemorySort(in <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        // Read into memory
        var a []int
        for v := range in {
            a = append(a, v)
        }
        fmt.Println("Read done:", time.Now().Sub(startTime))
        // Sort
        sort.Ints(a)
        fmt.Println("InMemSort done:", time.Now().Sub(startTime))

        // Output
        for _, v := range a {
            out <- v
        }
        close(out)
    }()
    return out
}


func Merge(in1, in2 <-chan int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        v1, ok1 := <- in1
        v2, ok2 := <- in2
        for ok1 || ok2 {
            if !ok2 || (ok1 && v1 <= v2) {
                out <- v1
                v1, ok1 = <-in1
            } else {
                out <- v2
                v2, ok2 = <-in2
            }
        }
        close(out)
        fmt.Println("Merge done:", time.Now().Sub(startTime))

    }()
    return out
}


func ReaderSource(reader io.Reader, chunkSize int) <-chan int {
    out := make(chan int, 1024)
    go func() {
        buffer := make([]byte, 8)
        bytesRead := 0
        for {
            n, err := reader.Read(buffer)
            bytesRead += n
            if n > 0 {
                v := int(binary.BigEndian.Uint64(buffer))
                out <- v
            }
            if err != nil  || (chunkSize != -1 && bytesRead >= chunkSize){
                break
            }
        }
        close(out)
    }()
    return out
}


func WriterSink(writer io.Writer, in <-chan int) {
    for v := range in {
        buffer := make([]byte, 8)
        binary.BigEndian.PutUint64(buffer, uint64(v))
        writer.Write(buffer)
    }
}


func RandomSource(count int) <-chan int {
    out := make(chan int)
    go func() {
        for i:=0; i<count; i++ {
            out <- rand.Int()
        }
        close(out)
    }()
    return out
}


func MergeN(inputs ...<-chan int) <-chan int{
    if len(inputs) == 1{
        return inputs[0]
    }
    m := len(inputs) / 2

    // merge inputs[0...m] and inputs[m...end]
    return Merge(MergeN(inputs[:m] ...), MergeN(inputs[m:] ...))
}

network_nodes.go

package pipeline

import (
    "net"
    "bufio"
)

func NetworkSink(addr string, in <-chan int){
    listener, err :=net.Listen("tcp", addr)
    if err != nil {
        panic(err)
    }
    go func() {
        defer listener.Close()
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        defer conn.Close()
        writer := bufio.NewWriter(conn)
        defer writer.Flush()
        WriterSink(writer, in)
    }()
}

func NetworkSource(addr string) <-chan int {
    out := make(chan int)
    go func() {
        conn, err := net.Dial("tcp", addr)
        if err != nil {
            panic(err)
        }
        r := ReaderSource(bufio.NewReader(conn), -1)
        for v := range r {
            out <- v
        }
        close(out)
    }()
    return out
}
原文地址:https://www.cnblogs.com/vincenshen/p/9236732.html