MIT 6.824 · Lecture 2: RPC and Threads #
Lecture Notes: https://pdos.csail.mit.edu/6.824/notes/l-rpc.txt
Why go in 6.824? #
- good support for threads / RPC
- Garbage collector: convenient
- Types safe
- Simple to learn
- Compiled
- The teacher (Frans Kaashoek) enjoys writing go programs :)
Thread of execution #
- A thread contains PC(program counter), stack, registers etc.
- threads can share memory
- runtime can do the operations to thread, like start/go, exit, stop, resume
Why threads? #
express concurrency:
- I/O concurrency
- multi-core parallelism
- convenience
In go, the threads(goroutine) are very lightweight, you are encouraged to create thread as you go.
Thread challenges #
- Race conditions
- two ways to address races:
- avoid sharing
- use locks
- two ways to address races:
- Coordination: channels or condiiton variables
- Deadlock
Go: and challenges #
Two plans to handles these concurrency challenges:
- Channels (no-sharing)
- Locks + condition variables (shared memeory)
Crawler #
Goals:
- I/O concurrenty
- Fetch a URL once
- Exploit parallelism
Three examples: sequential, mutex, channel.
crawler.go
package main
import (
"fmt"
"sync"
)
//
// Several solutions to the crawler exercise from the Go tutorial
// https://tour.golang.org/concurrency/10
//
//
// Serial crawler
//
func Serial(url string, fetcher Fetcher, fetched map[string]bool) {
if fetched[url] {
return
}
fetched[url] = true
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
for _, u := range urls {
Serial(u, fetcher, fetched)
}
return
}
//
// Concurrent crawler with shared state and Mutex
//
type fetchState struct {
mu sync.Mutex
fetched map[string]bool
}
func ConcurrentMutex(url string, fetcher Fetcher, f *fetchState) {
f.mu.Lock()
already := f.fetched[url]
f.fetched[url] = true
f.mu.Unlock()
if already {
return
}
urls, err := fetcher.Fetch(url)
if err != nil {
return
}
var done sync.WaitGroup
for _, u := range urls {
done.Add(1)
//u2 := u
//go func() {
// defer done.Done()
// ConcurrentMutex(u2, fetcher, f)
//}()
go func(u string) {
defer done.Done()
ConcurrentMutex(u, fetcher, f)
}(u)
}
done.Wait()
return
}
func makeState() *fetchState {
f := &fetchState{}
f.fetched = make(map[string]bool)
return f
}
//
// Concurrent crawler with channels
//
func worker(url string, ch chan []string, fetcher Fetcher) {
urls, err := fetcher.Fetch(url)
if err != nil {
ch <- []string{}
} else {
ch <- urls
}
}
func coordinator(ch chan []string, fetcher Fetcher) {
n := 1
fetched := make(map[string]bool)
for urls := range ch {
for _, u := range urls {
if fetched[u] == false {
fetched[u] = true
n += 1
go worker(u, ch, fetcher)
}
}
n -= 1
if n == 0 {
break
}
}
}
func ConcurrentChannel(url string, fetcher Fetcher) {
ch := make(chan []string)
go func() {
ch <- []string{url}
}()
coordinator(ch, fetcher)
}
//
// main
//
func main() {
fmt.Printf("=== Serial===\n")
Serial("http://golang.org/", fetcher, make(map[string]bool))
fmt.Printf("=== ConcurrentMutex ===\n")
ConcurrentMutex("http://golang.org/", fetcher, makeState())
fmt.Printf("=== ConcurrentChannel ===\n")
ConcurrentChannel("http://golang.org/", fetcher)
}
//
// Fetcher
//
type Fetcher interface {
// Fetch returns a slice of URLs found on the page.
Fetch(url string) (urls []string, err error)
}
// fakeFetcher is Fetcher that returns canned results.
type fakeFetcher map[string]*fakeResult
type fakeResult struct {
body string
urls []string
}
func (f fakeFetcher) Fetch(url string) ([]string, error) {
if res, ok := f[url]; ok {
fmt.Printf("found: %s\n", url)
return res.urls, nil
}
fmt.Printf("missing: %s\n", url)
return nil, fmt.Errorf("not found: %s", url)
}
// fetcher is a populated fakeFetcher.
var fetcher = fakeFetcher{
"http://golang.org/": &fakeResult{
"The Go Programming Language",
[]string{
"http://golang.org/pkg/",
"http://golang.org/cmd/",
},
},
"http://golang.org/pkg/": &fakeResult{
"Packages",
[]string{
"http://golang.org/",
"http://golang.org/cmd/",
"http://golang.org/pkg/fmt/",
"http://golang.org/pkg/os/",
},
},
"http://golang.org/pkg/fmt/": &fakeResult{
"Package fmt",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
"http://golang.org/pkg/os/": &fakeResult{
"Package os",
[]string{
"http://golang.org/",
"http://golang.org/pkg/",
},
},
}
RPC: remote procedure call #
Goal: RPCs behave roughly similar to procedure calls.
Client Server
----------- | -----------------
z := fn(x, y) ---> fn(x, y int) {
| return x + y
| }
Example: simple key-value server
kv.go
package main
import (
"fmt"
"log"
"net"
"net/rpc"
"sync"
)
//
// Common RPC request/reply definitions
//
const (
OK = "OK"
ErrNoKey = "ErrNoKey"
)
type Err string
type PutArgs struct {
Key string
Value string
}
type PutReply struct {
Err Err
}
type GetArgs struct {
Key string
}
type GetReply struct {
Err Err
Value string
}
//
// Client
//
func connect() *rpc.Client {
client, err := rpc.Dial("tcp", ":1234")
if err != nil {
log.Fatal("dialing:", err)
}
return client
}
func get(key string) string {
client := connect()
args := GetArgs{"subject"}
reply := GetReply{}
err := client.Call("KV.Get", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
return reply.Value
}
func put(key string, val string) {
client := connect()
args := PutArgs{"subject", "6.824"}
reply := PutReply{}
err := client.Call("KV.Put", &args, &reply)
if err != nil {
log.Fatal("error:", err)
}
client.Close()
}
//
// Server
//
type KV struct {
mu sync.Mutex
data map[string]string
}
func server() {
kv := new(KV)
kv.data = map[string]string{}
rpcs := rpc.NewServer()
rpcs.Register(kv)
l, e := net.Listen("tcp", ":1234")
if e != nil {
log.Fatal("listen error:", e)
}
go func() {
for {
conn, err := l.Accept()
if err == nil {
go rpcs.ServeConn(conn)
} else {
break
}
}
l.Close()
}()
}
func (kv *KV) Get(args *GetArgs, reply *GetReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
val, ok := kv.data[args.Key]
if ok {
reply.Err = OK
reply.Value = val
} else {
reply.Err = ErrNoKey
reply.Value = ""
}
return nil
}
func (kv *KV) Put(args *PutArgs, reply *PutReply) error {
kv.mu.Lock()
defer kv.mu.Unlock()
kv.data[args.Key] = args.Value
reply.Err = OK
return nil
}
//
// main
//
func main() {
server()
put("subject", "6.824")
fmt.Printf("Put(subject, 6.824) done\n")
fmt.Printf("get(subject) -> %s\n", get("subject"))
}
RPC semantics under failures #
- at-least-once
- at-most-once
- Exactly-once: hard to arrange (lab 3)
RPC \(\neq\) PR(procedure call).