douhui4699 2016-02-17 09:19
浏览 66
已采纳

Goraft中所有节点的状态

I have cluster of 4nodes 2001,2002,2003 & 2004. They are bind using goraft. Supppose 2001 is master server. Now when it fails, another node becomes the server. Now what I want is that, the node which becomes the current server should send message that I am the new LEADER. So how to achieve that? I am using GORAFT with GORAFD implementation. I am here attaching the source code.

main.go - For CLient

package main

import (
    "flag"
    "fmt"
    "github.com/goraft/raft"
    "github.com/goraft/raftd/command"
    "github.com/goraft/raftd/server"
    "log"
    "math/rand"
    "os"
    "time"
    "strconv"
)

var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string

func init() {
    flag.Parse()
    flag.BoolVar(&verbose, "v", false, "verbose logging")
    flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
    flag.BoolVar(&debug, "debug", false, "Raft debugging")
    flag.StringVar(&host, "h", "localhost", "hostname")
    p,_:=strconv.Atoi(flag.Arg(1))
    flag.IntVar(&port, "p", p, "port")
    flag.StringVar(&join, "join", "", "host:port of leader to join")
    flag.Usage = func() {
        fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> 
", os.Args[0])
        flag.PrintDefaults()
    }
}

func main() {
    log.SetFlags(0)
    flag.Parse()
    if verbose {
        log.Print("Verbose logging enabled.")
    }
    if trace {
        raft.SetLogLevel(raft.Trace)
        log.Print("Raft trace debugging enabled.")
    } else if debug {
        raft.SetLogLevel(raft.Debug)
        log.Print("Raft debugging enabled.")
    }

    rand.Seed(time.Now().UnixNano())

    // Setup commands.
    raft.RegisterCommand(&command.WriteCommand{})

    // Set the data directory.
    if flag.NArg() == 0 {
        flag.Usage()
        log.Fatal("Data path argument required")
    }
    path := flag.Arg(0)
    if err := os.MkdirAll(path, 0744); err != nil {
        log.Fatalf("Unable to create path: %v", err)
    }

    log.SetFlags(log.LstdFlags)
    s := server.New(path, host, port)
    log.Fatal(s.ListenAndServe("localhost:2001"))
    fmt.Println("I am changing my status");
}

Main.go - for Server i.e 2001

package main

import (
    "flag"
    "fmt"
    "github.com/goraft/raft"
    "github.com/goraft/raftd/command"
    "github.com/goraft/raftd/server"
    "log"
    "math/rand"
    "os"
    "time"
    "strconv"
)

var verbose bool
var trace bool
var debug bool
var host string
var port int
var join string

func init() {
    flag.Parse()
    flag.BoolVar(&verbose, "v", false, "verbose logging")
    flag.BoolVar(&trace, "trace", false, "Raft trace debugging")
    flag.BoolVar(&debug, "debug", false, "Raft debugging")
    flag.StringVar(&host, "h", "localhost", "hostname")
    p,_:=strconv.Atoi(flag.Arg(1))
    flag.IntVar(&port, "p", p, "port")
    flag.StringVar(&join, "join", "", "host:port of leader to join")
    flag.Usage = func() {
        fmt.Fprintf(os.Stderr, "Usage: %s [arguments] <data-path> 
", os.Args[0])
        flag.PrintDefaults()
    }
}

func main() {
    log.SetFlags(0)
    flag.Parse()
    if verbose {
        log.Print("Verbose logging enabled.")
    }
    if trace {
        raft.SetLogLevel(raft.Trace)
        log.Print("Raft trace debugging enabled.")
    } else if debug {
        raft.SetLogLevel(raft.Debug)
        log.Print("Raft debugging enabled.")
    }

    rand.Seed(time.Now().UnixNano())

    // Setup commands.
    raft.RegisterCommand(&command.WriteCommand{})

    // Set the data directory.
    if flag.NArg() == 0 {
        flag.Usage()
        log.Fatal("Data path argument required")
    }
    path := flag.Arg(0)
    if err := os.MkdirAll(path, 0744); err != nil {
        log.Fatalf("Unable to create path: %v", err)
    }

    log.SetFlags(log.LstdFlags)
    s := server.New(path, host, port)
    log.Fatal(s.ListenAndServe(join))
}

Common Server.go code

package server

import (
    "bytes"
    "encoding/json"
    "fmt"
    "github.com/goraft/raft"
    "github.com/goraft/raftd/command"
    "github.com/goraft/raftd/db"
    "github.com/gorilla/mux"
    "io/ioutil"
    "log"
    "math/rand"
    "net/http"
    "path/filepath"
    "sync"
    "time"
)

// The raftd server is a combination of the Raft server and an HTTP
// server which acts as the transport.
type Server struct {
    name       string
    host       string
    port       int
    path       string
    router     *mux.Router
    raftServer raft.Server
    httpServer *http.Server
    db         *db.DB
    mutex      sync.RWMutex
}

// Creates a new server.
func New(path string, host string, port int) *Server {
    s := &Server{
        host:   host,
        port:   port,
        path:   path,
        db:     db.New(),
        router: mux.NewRouter(),
    }

    // Read existing name or generate a new one.
    if b, err := ioutil.ReadFile(filepath.Join(path, "name")); err == nil {
        s.name = string(b)
    } else {
        s.name = fmt.Sprintf("%07x", rand.Int())[0:7]
        if err = ioutil.WriteFile(filepath.Join(path, "name"), []byte(s.name), 0644); err != nil {
            panic(err)
        }
    }

    return s
}

// Returns the connection string.
func (s *Server) connectionString() string {
    return fmt.Sprintf("http://%s:%d", s.host, s.port)
}

// Starts the server.
func (s *Server) ListenAndServe(leader string) error {
    var err error

    log.Printf("Initializing Raft Server: %s", s.path)

    // Initialize and start Raft server.
    transporter := raft.NewHTTPTransporter("/raft", 200*time.Millisecond)
    s.raftServer, err = raft.NewServer(s.name, s.path, transporter, nil, s.db, "")
    if err != nil {
        log.Fatal(err)
    }
    transporter.Install(s.raftServer, s)
    s.raftServer.Start()

    if leader != "" {
        // Join to leader if specified.

        log.Println("Attempting to join leader:", leader)

        if !s.raftServer.IsLogEmpty() {
            log.Fatal("Cannot join with an existing log")
        }
        if err := s.Join(leader); err != nil {
            log.Fatal(err)
        }

    } else if s.raftServer.IsLogEmpty() {
        // Initialize the server by joining itself.

        log.Println("Initializing new cluster")

        _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
            Name:             s.raftServer.Name(),
            ConnectionString: s.connectionString(),
        })
        if err != nil {
            log.Fatal(err)
        }

    } else {
        log.Println("Recovered from log")
    }

    log.Println("Initializing HTTP server")

    // Initialize and start HTTP server.
    s.httpServer = &http.Server{
        Addr:    fmt.Sprintf(":%d", s.port),
        Handler: s.router,
    }

    s.router.HandleFunc("/db/{key}", s.readHandler).Methods("GET")
    s.router.HandleFunc("/db/{key}", s.writeHandler).Methods("POST")
    s.router.HandleFunc("/join", s.joinHandler).Methods("POST")

    log.Println("Listening at:", s.connectionString())

    return s.httpServer.ListenAndServe()
}

// This is a hack around Gorilla mux not providing the correct net/http
// HandleFunc() interface.
func (s *Server) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
    s.router.HandleFunc(pattern, handler)
}

// Joins to the leader of an existing cluster.
func (s *Server) Join(leader string) error {
    command := &raft.DefaultJoinCommand{
        Name:     s.raftServer.Name(),
        ConnectionString: s.connectionString(),
    }

    var b bytes.Buffer
    json.NewEncoder(&b).Encode(command)
    resp, err := http.Post(fmt.Sprintf("http://%s/join", leader), "application/json", &b)
    if err != nil {
        return err
    }
    resp.Body.Close()

    return nil
}

func (s *Server) joinHandler(w http.ResponseWriter, req *http.Request) {
    command := &raft.DefaultJoinCommand{}

    if err := json.NewDecoder(req.Body).Decode(&command); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
    if _, err := s.raftServer.Do(command); err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }
}

func (s *Server) readHandler(w http.ResponseWriter, req *http.Request) {
    vars := mux.Vars(req)
    value := s.db.Get(vars["key"])
    w.Write([]byte(value))
}

func (s *Server) writeHandler(w http.ResponseWriter, req *http.Request) {
    vars := mux.Vars(req)

    // Read the value from the POST body.
    b, err := ioutil.ReadAll(req.Body)
    if err != nil {
        w.WriteHeader(http.StatusBadRequest)
        return
    }
    value := string(b)

    // Execute the command against the Raft server.
    _, err = s.raftServer.Do(command.NewWriteCommand(vars["key"], value))
    if err != nil {
        http.Error(w, err.Error(), http.StatusBadRequest)
    }
}

Please give some solutions.

  • 写回答

1条回答 默认 最新

  • douba5540 2016-02-18 05:38
    关注

    I did it.

    I have just insertednew line in goraft-library code where leader selection happens.

    So to make it just goto server.go file of goraft and make following changes.

    Original Server.go - line [287-309]

    // Sets the state of the server.
    func (s *server) setState(state string) {
        s.mutex.Lock()
        defer s.mutex.Unlock()
    
        // Temporarily store previous values.
        prevState := s.state
        prevLeader := s.leader
    
        // Update state and leader.
        s.state = state
        if state == Leader {
            s.leader = s.Name()
            s.syncedPeer = make(map[string]bool)
        }
    
        // Dispatch state and leader change events.
        s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
    
        if prevLeader != s.leader {
            s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
        }
    }
    

    Edited Server.go

    // Sets the state of the server.
    func (s *server) setState(state string) {
        s.mutex.Lock()
        defer s.mutex.Unlock()
    
        // Temporarily store previous values.
        prevState := s.state
        prevLeader := s.leader
    
        // Update state and leader.
        s.state = state
        if state == Leader {
            s.leader = s.Name()
            s.syncedPeer = make(map[string]bool)
        }
    
        // Dispatch state and leader change events.
        s.DispatchEvent(newEvent(StateChangeEventType, s.state, prevState))
    
        if prevLeader != s.leader {
            fmt.Println("I am the Leader..!!  ",s.connectionString,"   ",s.path)
            s.DispatchEvent(newEvent(LeaderChangeEventType, s.leader, prevLeader))
        }
    }
    

    So it will print connection Stirng as well as storage path of the current server on the console of active master server.

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论

报告相同问题?

悬赏问题

  • ¥15 这个电路是如何实现路灯控制器的,原理是什么,怎么求解灯亮起后熄灭的时间如图?
  • ¥15 matlab数字图像处理频率域滤波
  • ¥15 在abaqus做了二维正交切削模型,给刀具添加了超声振动条件后输出切削力为什么比普通切削增大这么多
  • ¥15 ELGamal和paillier计算效率谁快?
  • ¥15 file converter 转换格式失败 报错 Error marking filters as finished,如何解决?
  • ¥15 ubuntu系统下挂载磁盘上执行./提示权限不够
  • ¥15 Arcgis相交分析无法绘制一个或多个图形
  • ¥15 关于#r语言#的问题:差异分析前数据准备,报错Error in data[, sampleName1] : subscript out of bounds请问怎么解决呀以下是全部代码:
  • ¥15 seatunnel-web使用SQL组件时候后台报错,无法找到表格
  • ¥15 fpga自动售货机数码管(相关搜索:数字时钟)