Skip to content

Commit 346f8fb

Browse files
corylanouclaude
andauthored
feat: add NATS JetStream Object Store replica client (#712)
Co-authored-by: Claude <[email protected]>
1 parent b9e1ebf commit 346f8fb

File tree

8 files changed

+822
-0
lines changed

8 files changed

+822
-0
lines changed

.github/workflows/commit.yml

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
on:
22
push:
3+
branches:
4+
- main
35
pull_request:
46
types:
57
- opened
@@ -102,6 +104,65 @@ jobs:
102104

103105
- run: ./etc/s3_mock.py go test -v ./replica_client_test.go -integration s3
104106

107+
nats-docker-test:
108+
name: Run NATS Integration Tests
109+
runs-on: ubuntu-latest
110+
needs: build
111+
steps:
112+
- uses: actions/checkout@v4
113+
114+
- uses: actions/setup-go@v5
115+
with:
116+
go-version-file: "go.mod"
117+
118+
- name: Start NATS Server with JetStream
119+
run: |
120+
docker run -d \
121+
--name nats-test \
122+
-p 4222:4222 \
123+
-p 8222:8222 \
124+
nats:latest \
125+
-js \
126+
-DV
127+
128+
# Wait for NATS to be ready
129+
echo "Waiting for NATS server to be ready..."
130+
for i in {1..30}; do
131+
if nc -z localhost 4222; then
132+
echo "NATS server is ready"
133+
break
134+
fi
135+
echo "Waiting for NATS server... ($i/30)"
136+
sleep 1
137+
done
138+
139+
- name: Create NATS Object Store Bucket
140+
run: |
141+
# Install NATS CLI - get latest version
142+
NATS_VERSION=$(curl -s https://api.github.com/repos/nats-io/natscli/releases/latest | grep '"tag_name":' | sed -E 's/.*"v([^"]+)".*/\1/')
143+
wget "https://github.com/nats-io/natscli/releases/download/v${NATS_VERSION}/nats-${NATS_VERSION}-linux-amd64.zip" -O nats.zip
144+
unzip nats.zip
145+
sudo mv "nats-${NATS_VERSION}-linux-amd64/nats" /usr/local/bin/
146+
sudo chmod +x /usr/local/bin/nats
147+
148+
# Create the object store bucket
149+
nats object add litestream-test --max-bucket-size=100M --replicas=1
150+
151+
- run: go env
152+
153+
- run: go install ./cmd/litestream
154+
155+
- run: go test -v ./replica_client_test.go -integration nats
156+
env:
157+
LITESTREAM_NATS_URL: "nats://localhost:4222"
158+
LITESTREAM_NATS_BUCKET: "litestream-test"
159+
160+
- name: Cleanup
161+
if: always()
162+
run: |
163+
docker stop nats-test || true
164+
docker rm nats-test || true
165+
105166
s3-integration-test:
106167
name: Run S3 Integration Tests
107168
runs-on: ubuntu-latest

cmd/litestream/main.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/benbjohnson/litestream/file"
2727
"github.com/benbjohnson/litestream/gs"
2828
"github.com/benbjohnson/litestream/internal"
29+
"github.com/benbjohnson/litestream/nats"
2930
"github.com/benbjohnson/litestream/s3"
3031
"github.com/benbjohnson/litestream/sftp"
3132
)
@@ -559,6 +560,19 @@ type ReplicaConfig struct {
559560
Password string `yaml:"password"`
560561
KeyPath string `yaml:"key-path"`
561562

563+
// NATS settings
564+
JWT string `yaml:"jwt"`
565+
Seed string `yaml:"seed"`
566+
Creds string `yaml:"creds"`
567+
NKey string `yaml:"nkey"`
568+
Username string `yaml:"username"`
569+
Token string `yaml:"token"`
570+
TLS bool `yaml:"tls"`
571+
RootCAs []string `yaml:"root-cas"`
572+
MaxReconnects *int `yaml:"max-reconnects"`
573+
ReconnectWait *time.Duration `yaml:"reconnect-wait"`
574+
Timeout *time.Duration `yaml:"timeout"`
575+
562576
// Encryption identities and recipients
563577
Age struct {
564578
Identities []string `yaml:"identities"`
@@ -617,6 +631,10 @@ func NewReplicaFromConfig(c *ReplicaConfig, db *litestream.DB) (_ *litestream.Re
617631
if r.Client, err = newSFTPReplicaClientFromConfig(c, r); err != nil {
618632
return nil, err
619633
}
634+
case "nats":
635+
if r.Client, err = newNATSReplicaClientFromConfig(c, r); err != nil {
636+
return nil, err
637+
}
620638
default:
621639
return nil, fmt.Errorf("unknown replica type in config: %q", c.Type)
622640
}
@@ -850,6 +868,72 @@ func newSFTPReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_
850868
return client, nil
851869
}
852870

871+
// newNATSReplicaClientFromConfig returns a new instance of nats.ReplicaClient built from config.
872+
func newNATSReplicaClientFromConfig(c *ReplicaConfig, _ *litestream.Replica) (_ *nats.ReplicaClient, err error) {
873+
// Parse URL if provided to extract bucket name and server URL
874+
var url, bucket string
875+
if c.URL != "" {
876+
scheme, host, path, err := ParseReplicaURL(c.URL)
877+
if err != nil {
878+
return nil, fmt.Errorf("invalid NATS URL: %w", err)
879+
}
880+
if scheme != "nats" {
881+
return nil, fmt.Errorf("invalid scheme for NATS replica: %s", scheme)
882+
}
883+
884+
// Reconstruct URL without bucket path
885+
if host != "" {
886+
url = fmt.Sprintf("nats://%s", host)
887+
}
888+
889+
// Extract bucket name from path
890+
if path != "" {
891+
bucket = strings.Trim(path, "/")
892+
}
893+
}
894+
895+
// Use bucket from config if not extracted from URL
896+
if bucket == "" {
897+
bucket = c.Bucket
898+
}
899+
900+
// Ensure required settings are set
901+
if bucket == "" {
902+
return nil, fmt.Errorf("bucket required for NATS replica")
903+
}
904+
905+
// Build replica client
906+
client := nats.NewReplicaClient()
907+
client.URL = url
908+
client.BucketName = bucket
909+
910+
// Set authentication options
911+
client.JWT = c.JWT
912+
client.Seed = c.Seed
913+
client.Creds = c.Creds
914+
client.NKey = c.NKey
915+
client.Username = c.Username
916+
client.Password = c.Password
917+
client.Token = c.Token
918+
919+
// Set TLS options
920+
client.TLS = c.TLS
921+
client.RootCAs = c.RootCAs
922+
923+
// Set connection options with defaults
924+
if c.MaxReconnects != nil {
925+
client.MaxReconnects = *c.MaxReconnects
926+
}
927+
if c.ReconnectWait != nil {
928+
client.ReconnectWait = *c.ReconnectWait
929+
}
930+
if c.Timeout != nil {
931+
client.Timeout = *c.Timeout
932+
}
933+
934+
return client, nil
935+
}
936+
853937
// applyLitestreamEnv copies "LITESTREAM" prefixed environment variables to
854938
// their AWS counterparts as the "AWS" prefix can be confusing when using a
855939
// non-AWS S3-compatible service.

cmd/litestream/replicate.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/benbjohnson/litestream/abs"
1919
"github.com/benbjohnson/litestream/file"
2020
"github.com/benbjohnson/litestream/gs"
21+
"github.com/benbjohnson/litestream/nats"
2122
"github.com/benbjohnson/litestream/s3"
2223
"github.com/benbjohnson/litestream/sftp"
2324
)
@@ -148,6 +149,8 @@ func (c *ReplicateCommand) Run() (err error) {
148149
slog.Info("replicating to", "bucket", client.Bucket, "path", client.Path, "endpoint", client.Endpoint)
149150
case *sftp.ReplicaClient:
150151
slog.Info("replicating to", "host", client.Host, "user", client.User, "path", client.Path)
152+
case *nats.ReplicaClient:
153+
slog.Info("replicating to", "bucket", client.BucketName, "url", client.URL)
151154
default:
152155
slog.Info("replicating to")
153156
}

go.mod

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ require (
2020
github.com/mark3labs/mcp-go v0.32.0
2121
github.com/mattn/go-shellwords v1.0.12
2222
github.com/mattn/go-sqlite3 v1.14.19
23+
github.com/nats-io/nats.go v1.44.0
2324
github.com/pkg/sftp v1.13.6
2425
github.com/prometheus/client_golang v1.17.0
2526
github.com/superfly/ltx v0.3.18
@@ -63,11 +64,14 @@ require (
6364
github.com/google/uuid v1.6.0 // indirect
6465
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
6566
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
67+
github.com/klauspost/compress v1.18.0 // indirect
6668
github.com/kr/fs v0.1.0 // indirect
6769
github.com/kylelemons/godebug v1.1.0 // indirect
6870
github.com/mattn/go-colorable v0.1.13 // indirect
6971
github.com/mattn/go-isatty v0.0.17 // indirect
7072
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
73+
github.com/nats-io/nkeys v0.4.11 // indirect
74+
github.com/nats-io/nuid v1.0.1 // indirect
7175
github.com/pierrec/lz4/v4 v4.1.22 // indirect
7276
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
7377
github.com/prometheus/client_model v0.5.0 // indirect

go.sum

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f h1:7LYC+Yfk
143143
github.com/hokaccha/go-prettyjson v0.0.0-20211117102719-0474bc63780f/go.mod h1:pFlLw2CfqZiIBOx6BuCeRLCrfxBJipTY0nIOF/VbGcI=
144144
github.com/keybase/go-keychain v0.0.1 h1:way+bWYa6lDppZoZcgMbYsvC7GxljxrskdNInRtuthU=
145145
github.com/keybase/go-keychain v0.0.1/go.mod h1:PdEILRW3i9D8JcdM+FmY6RwkHGnhHxXwkPPMeUgOK1k=
146+
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
147+
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
146148
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
147149
github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg=
148150
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
@@ -167,6 +169,12 @@ github.com/mattn/go-sqlite3 v1.14.19 h1:fhGleo2h1p8tVChob4I9HpmVFIAkKGpiukdrgQbW
167169
github.com/mattn/go-sqlite3 v1.14.19/go.mod h1:2eHXhiwb8IkHr+BDWZGa96P6+rkvnG63S2DGjv9HUNg=
168170
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
169171
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
172+
github.com/nats-io/nats.go v1.44.0 h1:ECKVrDLdh/kDPV1g0gAQ+2+m2KprqZK5O/eJAyAnH2M=
173+
github.com/nats-io/nats.go v1.44.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
174+
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
175+
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
176+
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
177+
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
170178
github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU=
171179
github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
172180
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c h1:+mdjkGKdHQG3305AYmdv1U2eRNDiU2ErMBj1gwrq8eQ=

0 commit comments

Comments
 (0)