230 lines
4.7 KiB
Go
230 lines
4.7 KiB
Go
package main
|
|
|
|
import (
|
|
"encoding/json"
|
|
"flag"
|
|
"log"
|
|
"net/http"
|
|
|
|
"github.com/dgraph-io/badger"
|
|
"github.com/nats-io/go-nats"
|
|
"github.com/golang/protobuf/proto"
|
|
)
|
|
|
|
var dbPath string
|
|
var natsHost string
|
|
|
|
var db *badger.DB
|
|
var nc *nats.Conn
|
|
|
|
func main() {
|
|
// Parse flags
|
|
flag.StringVar(&dbPath, "dbpath", "/tmp/badger", "path to store data")
|
|
flag.StringVar(&natsHost, "nats", "nats://localhost:4222", "host and port of NATS")
|
|
flag.Parse()
|
|
|
|
// Open badger
|
|
log.Printf("starting badger at %s", dbPath)
|
|
opts := badger.DefaultOptions
|
|
opts.Dir = dbPath
|
|
opts.ValueDir = dbPath
|
|
var err error
|
|
db, err = badger.Open(opts)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
defer db.Close()
|
|
|
|
// NATS client
|
|
nc, err = nats.Connect(natsHost)
|
|
if err != nil {
|
|
log.Fatal(err)
|
|
}
|
|
nc.Subscribe("new_store", NewStore)
|
|
|
|
nc.Subscribe("request_store", RequestStore)
|
|
nc.Subscribe("scan_store", ScanStore)
|
|
defer nc.Close()
|
|
|
|
select { } // Wait forever
|
|
}
|
|
|
|
func NewStore(m *nats.Msg) {
|
|
storeRequest := Store{}
|
|
if err := proto.Unmarshal(m.Data, &storeRequest); err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
|
|
key, err := MarshalKey(storeRequest.Type, storeRequest.Bite.Key, storeRequest.Bite.Start)
|
|
if err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
|
|
err = db.Update(func(txn *badger.Txn) error {
|
|
// TODO: prevent overwriting existing
|
|
err := txn.Set(key, storeRequest.Bite.Data)
|
|
return err
|
|
})
|
|
|
|
if err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
}
|
|
|
|
func RequestStore(m *nats.Msg) {
|
|
req := DataRequest{}
|
|
if err := proto.Unmarshal(m.Data, &req); err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
|
|
key, err := MarshalKey(req.Type, req.Key, req.Start)
|
|
|
|
err = db.View(func(txn *badger.Txn) error {
|
|
item, err := txn.Get(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
err = item.Value(func(value []byte) error {
|
|
res := Response {
|
|
Code: 200,
|
|
Message: value,
|
|
}
|
|
resBytes, resErr := proto.Marshal(&res)
|
|
|
|
if resErr != nil {
|
|
return resErr
|
|
}
|
|
|
|
nc.Publish(m.Reply, resBytes)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
res := ReplyError(err.Error(), 400)
|
|
nc.Publish(m.Reply, res)
|
|
}
|
|
}
|
|
|
|
type BitesList struct {
|
|
Previous uint64 `json:"previous"` // One bite before starts. Hint for how many steps the client can skip
|
|
Starts []uint64 `json:"starts"`
|
|
Next uint64 `json:"next"` // One bite after starts. Hint for how many steps the client can skip
|
|
}
|
|
|
|
func ScanStore(m *nats.Msg) {
|
|
req := ScanRequest {}
|
|
if err := proto.Unmarshal(m.Data, &req); err != nil {
|
|
log.Println(err)
|
|
return
|
|
}
|
|
|
|
prefix, err := MarshalKeyPrefix(req.Type, req.Key)
|
|
|
|
if err != nil {
|
|
res := ReplyError(http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
|
nc.Publish(m.Reply, res)
|
|
return
|
|
}
|
|
|
|
fromKey, err := MarshalKey(req.Type, req.Key, req.From)
|
|
if err != nil {
|
|
res := ReplyError(http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
|
nc.Publish(m.Reply, res)
|
|
return
|
|
}
|
|
|
|
bitesList := BitesList {}
|
|
|
|
err = db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
opts.Reverse = true
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
|
|
// Fetch previous key
|
|
it.Seek(fromKey)
|
|
if it.ValidForPrefix(fromKey) {
|
|
// Lazy check to compare key == seeked key
|
|
it.Next()
|
|
}
|
|
if !it.ValidForPrefix(prefix) {
|
|
return nil
|
|
}
|
|
item := it.Item()
|
|
key := item.Key()
|
|
|
|
_, _, start, err := ExtractKey(key)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
bitesList.Previous = start
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
res := ReplyError(err.Error(), http.StatusBadRequest)
|
|
nc.Publish(m.Reply, res)
|
|
return
|
|
}
|
|
|
|
err = db.View(func(txn *badger.Txn) error {
|
|
opts := badger.DefaultIteratorOptions
|
|
opts.PrefetchValues = false
|
|
it := txn.NewIterator(opts)
|
|
defer it.Close()
|
|
|
|
for it.Seek(fromKey); it.ValidForPrefix(prefix); it.Next() {
|
|
item := it.Item()
|
|
key := item.Key()
|
|
|
|
_, _, start, err := ExtractKey(key)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if start > req.To {
|
|
// A key was found that is greater than to
|
|
// Save that as next
|
|
bitesList.Next = start
|
|
break
|
|
}
|
|
|
|
bitesList.Starts = append(bitesList.Starts, start)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
res := ReplyError(err.Error(), http.StatusBadRequest)
|
|
nc.Publish(m.Reply, res)
|
|
return
|
|
}
|
|
|
|
jsonString, err := json.Marshal(&bitesList)
|
|
res := Response {
|
|
Code: 200,
|
|
Message: []byte(jsonString),
|
|
}
|
|
resBytes, _ := proto.Marshal(&res)
|
|
nc.Publish(m.Reply, resBytes)
|
|
}
|
|
|
|
func ReplyError(msg string, code uint32) []byte {
|
|
res := Response {
|
|
Code: code,
|
|
Message: []byte(msg),
|
|
}
|
|
resBytes, _ := proto.Marshal(&res)
|
|
|
|
return resBytes
|
|
}
|