Skip to content
Open
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
fa8bcf6
feat(s3): add e2e coverage for blob-backed API
pthmas Apr 28, 2026
d1eec81
fix(lint): simplify apex service setup wiring
pthmas Apr 29, 2026
d9453ce
Merge remote-tracking branch 'origin/main' into pthmas/s3-blob-export
pthmas Apr 29, 2026
f618715
feat(s3): add configurable sigv4 auth
pthmas Apr 29, 2026
8d1fb3e
warn on unauthenticated s3 startup
pthmas Apr 29, 2026
2e6da25
fix(s3): enforce read-only mode when submission is not configured
pthmas Apr 29, 2026
2867ed0
fix(s3): harden server, auth, and store correctness
pthmas Apr 29, 2026
6e7281b
docs(s3): clarify PutObject behavior for empty objects
pthmas Apr 29, 2026
d97437e
fix(s3): address critical routing/race bugs and medium hardening
pthmas Apr 29, 2026
e70b560
fix(s3): address critical routing/race bugs and medium hardening
pthmas Apr 30, 2026
529218f
feat(s3): submit commitment envelope to Celestia instead of raw data
pthmas Apr 30, 2026
a96ce16
fix(s3): validate payload hash and check bucket before Celestia submit
pthmas Apr 30, 2026
41fe108
fix(s3): address pagination, env-var credentials, and SQL limit
pthmas May 4, 2026
c868668
fix(e2e): verify envelope commitment on Celestia, not raw object data
pthmas May 4, 2026
0e2f36d
refactor(s3): remove unused fields, fix double ETag, clean dead code
pthmas May 4, 2026
3e61846
chore(s3): drop unused columns from migration
pthmas May 4, 2026
1222861
fix(s3): reject empty objects instead of storing them without Celesti…
pthmas May 4, 2026
ac46e3d
fix(e2e): remove pkg/s3 import to avoid proto conflict
pthmas May 4, 2026
5a3d2e0
fix(lint): fix gofumpt, perfsprint, unparam violations
pthmas May 4, 2026
c3bd226
fix(s3): address PR review comments
pthmas May 5, 2026
1a25d38
fix(s3): thread ctx into setupS3Server, close on startup failure
pthmas May 5, 2026
83ecea4
refactor(s3): extract startRPCServers to reduce gocyclo
pthmas May 5, 2026
e14fea8
chore(justfile): add e2e-s3 target for S3 lifecycle test
pthmas May 5, 2026
9451b9a
fix s3 empty upload and submitter wiring
pthmas May 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 101 additions & 18 deletions cmd/apex/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

Expand All @@ -24,6 +25,7 @@
"github.com/evstack/apex/pkg/fetch"
"github.com/evstack/apex/pkg/metrics"
"github.com/evstack/apex/pkg/profile"
apexs3 "github.com/evstack/apex/pkg/s3"
"github.com/evstack/apex/pkg/store"
"github.com/evstack/apex/pkg/submit"
syncer "github.com/evstack/apex/pkg/sync"
Expand Down Expand Up @@ -214,6 +216,83 @@
}
}

func setupS3Server(cfg *config.Config, db store.Store, blobSubmitter submit.Submitter, log zerolog.Logger) (*http.Server, error) {
if !cfg.S3.Enabled {
return nil, nil
}

if cfg.S3.AccessKeyID == "" && cfg.S3.SecretAccessKey == "" {
warnLog := log.Warn().Str("addr", cfg.S3.ListenAddr)
if isLoopbackBindAddr(cfg.S3.ListenAddr) {
warnLog.Msg("S3 API authentication is disabled; restrict access to trusted local clients or set APEX_S3_ACCESS_KEY_ID and APEX_S3_SECRET_ACCESS_KEY")
} else {
warnLog.Msg("S3 API authentication is disabled on a non-loopback bind; set APEX_S3_ACCESS_KEY_ID and APEX_S3_SECRET_ACCESS_KEY or place Apex behind a trusted authenticated proxy")
}
}

var ns types.Namespace
if cfg.S3.Namespace != "" {
var err error
ns, err = types.NamespaceFromHex(cfg.S3.Namespace)
if err != nil {
return nil, fmt.Errorf("parse S3 namespace: %w", err)
}
}

sqliteDB, ok := db.(*store.SQLiteStore)
if !ok {
return nil, fmt.Errorf("S3 API requires SQLite store, got %T", db)
}

objStore := store.NewObjectStore(sqliteDB)
s3Svc := apexs3.NewService(objStore, blobSubmitter, ns)
s3Srv := apexs3.NewServer(s3Svc, cfg.S3.Region, cfg.S3.AccessKeyID, cfg.S3.SecretAccessKey, log)

lis, err := net.Listen("tcp", cfg.S3.ListenAddr)

Check failure on line 251 in cmd/apex/main.go

View workflow job for this annotation

GitHub Actions / Lint

net.Listen must not be called. use (*net.ListenConfig).Listen (noctx)
if err != nil {
return nil, fmt.Errorf("S3 API: listen %s: %w", cfg.S3.ListenAddr, err)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

httpSrv := &http.Server{
Handler: s3Srv,
ReadHeaderTimeout: 10 * time.Second,
}

go func() {
log.Info().Str("addr", cfg.S3.ListenAddr).Msg("S3 API server listening")
if err := httpSrv.Serve(lis); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Error().Err(err).Msg("S3 API server error")
}
}()
Comment thread
coderabbitai[bot] marked this conversation as resolved.

return httpSrv, nil
}

func isLoopbackBindAddr(addr string) bool {
addr = strings.TrimSpace(addr)
if addr == "" {
return false
}
// Unix sockets are always local.
if strings.HasPrefix(addr, "/") || strings.HasPrefix(addr, "unix:") {
return true
}
host := addr
if h, _, err := net.SplitHostPort(addr); err == nil {
host = h
}
host = strings.Trim(host, "[]") // strip IPv6 brackets
if host == "" {
return false // bare ":port" binds all interfaces
}
lower := strings.ToLower(host)
if lower == "localhost" || strings.HasSuffix(lower, ".localhost") {
return true
}
ip := net.ParseIP(host)
return ip != nil && ip.IsLoopback()
}

func persistNamespaces(ctx context.Context, db store.Store, namespaces []types.Namespace) error {
for _, ns := range namespaces {
if err := db.PutNamespace(ctx, ns); err != nil {
Expand Down Expand Up @@ -282,11 +361,21 @@
}
defer dataFetcher.Close() //nolint:errcheck

svc, notifier, closeSubmitter, err := setupAPIService(cfg, db, dataFetcher, proofFwd, rec)
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return err
}
defer closeSubmitter()
if blobSubmitter != nil {
defer blobSubmitter.Close() //nolint:errcheck
}

// Setup S3 API server if enabled.
s3Srv, err := setupS3Server(cfg, db, blobSubmitter, log.Logger)
if err != nil {
return fmt.Errorf("setup S3 server: %w", err)
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated

svc, notifier := setupAPIService(cfg, db, dataFetcher, proofFwd, rec, blobSubmitter)

// Build and run the sync coordinator with observer hook.
coordOpts, closeBackfill, err := buildCoordinatorOptions(cfg, notifier, rec)
Expand Down Expand Up @@ -342,7 +431,7 @@

err = coord.Run(ctx)

gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv)
gracefulShutdown(httpSrv, grpcSrv, metricsSrv, profileSrv, s3Srv)

if err != nil && !errors.Is(err, context.Canceled) {
return fmt.Errorf("coordinator: %w", err)
Expand Down Expand Up @@ -385,19 +474,7 @@
return blobSubmitter, nil
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder) (*api.Service, *api.Notifier, func(), error) {
blobSubmitter, err := openBlobSubmitter(cfg)
if err != nil {
return nil, nil, nil, err
}

closeSubmitter := func() {}
if blobSubmitter != nil {
closeSubmitter = func() {
_ = blobSubmitter.Close()
}
}

func setupAPIService(cfg *config.Config, db store.Store, dataFetcher fetch.DataFetcher, proofFwd fetch.ProofForwarder, rec metrics.Recorder, blobSubmitter submit.Submitter) (*api.Service, *api.Notifier) {
notifier := api.NewNotifier(cfg.Subscription.BufferSize, cfg.Subscription.MaxSubscribers, log.Logger)
notifier.SetMetrics(rec)

Expand All @@ -407,7 +484,7 @@
}

svc := api.NewService(db, dataFetcher, proofFwd, notifier, log.Logger, svcOpts...)
return svc, notifier, closeSubmitter, nil
return svc, notifier
}

func buildCoordinatorOptions(cfg *config.Config, notifier *api.Notifier, rec metrics.Recorder) ([]syncer.Option, func(), error) {
Expand Down Expand Up @@ -436,7 +513,7 @@
return coordOpts, closeBackfill, nil
}

func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server) {
func gracefulShutdown(httpSrv *http.Server, grpcSrv *grpc.Server, metricsSrv *metrics.Server, profileSrv *profile.Server, s3Srv *http.Server) {
stopped := make(chan struct{})
go func() {
grpcSrv.GracefulStop()
Expand All @@ -457,6 +534,12 @@
log.Error().Err(err).Msg("JSON-RPC server shutdown error")
}

if s3Srv != nil {
if err := s3Srv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("S3 API server shutdown error")
}
}

if metricsSrv != nil {
if err := metricsSrv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("metrics server shutdown error")
Expand Down
34 changes: 34 additions & 0 deletions cmd/apex/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package main

import "testing"

func TestIsLoopbackBindAddr(t *testing.T) {
t.Parallel()

tests := []struct {
addr string
want bool
}{
{addr: "127.0.0.1:8333", want: true},
{addr: "[::1]:8333", want: true},
{addr: "localhost:8333", want: true},
{addr: "api.localhost:8333", want: true},
{addr: "unix:///tmp/apex.sock", want: true},
{addr: "/tmp/apex.sock", want: true},
{addr: ":8333", want: false},
{addr: "0.0.0.0:8333", want: false},
{addr: "[::]:8333", want: false},
{addr: "apex.example.com:8333", want: false},
{addr: "", want: false},
}

for _, tt := range tests {
t.Run(tt.addr, func(t *testing.T) {
t.Parallel()

if got := isLoopbackBindAddr(tt.addr); got != tt.want {
t.Fatalf("isLoopbackBindAddr(%q) = %v, want %v", tt.addr, got, tt.want)
}
})
}
}
18 changes: 18 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type Config struct {
Profiling ProfilingConfig `yaml:"profiling"`
Log LogConfig `yaml:"log"`
Submission SubmissionConfig `yaml:"submission"`
S3 S3APIConfig `yaml:"s3"`
}

// DataSourceConfig configures the Celestia data source.
Expand Down Expand Up @@ -94,6 +95,18 @@ type LogConfig struct {
Format string `yaml:"format"`
}

// S3APIConfig configures the S3-compatible API server.
type S3APIConfig struct {
Enabled bool `yaml:"enabled"`
ListenAddr string `yaml:"listen_addr"`
Region string `yaml:"region"`
Namespace string `yaml:"namespace"` // Celestia namespace for S3 objects (hex)
// AccessKeyID and SecretAccessKey are not read from YAML; set via
// APEX_S3_ACCESS_KEY_ID and APEX_S3_SECRET_ACCESS_KEY env vars.
AccessKeyID string `yaml:"-"`
SecretAccessKey string `yaml:"-"`
}

// SubmissionConfig contains settings for the future blob submission pipeline.
type SubmissionConfig struct {
Enabled bool `yaml:"enabled"`
Expand Down Expand Up @@ -155,6 +168,11 @@ func DefaultConfig() Config {
Level: "info",
Format: "json",
},
S3: S3APIConfig{
Enabled: false,
ListenAddr: ":8333",
Region: "us-east-1",
},
}
}

Expand Down
61 changes: 60 additions & 1 deletion config/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ storage:
# endpoint: "" # custom endpoint for MinIO, R2, etc.
# chunk_size: 64 # heights per S3 object

s3:
# Enable the S3-compatible HTTP API backed by SQLite object storage.
enabled: false
# Address for the S3-compatible API server.
listen_addr: ":8333"
# AWS region reported to clients.
region: "us-east-1"
# Namespace used when S3 uploads are submitted to Celestia.
namespace: ""
# Optional SigV4 credentials enforced by the S3 API.
# Set via env vars only (not in this file):
# APEX_S3_ACCESS_KEY_ID=<key-id>
# APEX_S3_SECRET_ACCESS_KEY=<secret>

rpc:
# Address for the JSON-RPC API server (HTTP/WebSocket)
listen_addr: ":8080"
Expand Down Expand Up @@ -158,10 +172,16 @@ func Load(path string) (*Config, error) {
return nil, fmt.Errorf("parsing config: %w", err)
}

// Env var override.
// Env var overrides.
if token := os.Getenv("APEX_AUTH_TOKEN"); token != "" {
cfg.DataSource.AuthToken = token
}
if v := os.Getenv("APEX_S3_ACCESS_KEY_ID"); v != "" {
cfg.S3.AccessKeyID = v
}
if v := os.Getenv("APEX_S3_SECRET_ACCESS_KEY"); v != "" {
cfg.S3.SecretAccessKey = v
}
if err := validate(&cfg); err != nil {
return nil, fmt.Errorf("validating config: %w", err)
}
Expand Down Expand Up @@ -290,6 +310,9 @@ func validate(cfg *Config) error {
if err := validateSubmission(&cfg.Submission); err != nil {
return err
}
if err := validateS3API(&cfg.S3, &cfg.Storage, &cfg.Submission); err != nil {
return err
}
if !validLogLevels[cfg.Log.Level] {
return fmt.Errorf("log.level %q is invalid; must be one of trace/debug/info/warn/error/fatal/panic", cfg.Log.Level)
}
Expand Down Expand Up @@ -381,6 +404,42 @@ func validateSubmission(s *SubmissionConfig) error {
return nil
}

func validateS3API(s3cfg *S3APIConfig, storage *StorageConfig, submission *SubmissionConfig) error {
s3cfg.ListenAddr = strings.TrimSpace(s3cfg.ListenAddr)
s3cfg.Region = strings.TrimSpace(s3cfg.Region)
s3cfg.Namespace = strings.TrimSpace(s3cfg.Namespace)
s3cfg.AccessKeyID = strings.TrimSpace(s3cfg.AccessKeyID)
s3cfg.SecretAccessKey = strings.TrimSpace(s3cfg.SecretAccessKey)
if s3cfg.Region == "" {
s3cfg.Region = DefaultConfig().S3.Region
}
if !s3cfg.Enabled {
return nil
}
if (s3cfg.AccessKeyID == "") != (s3cfg.SecretAccessKey == "") {
return errors.New("s3.access_key_id and s3.secret_access_key must be provided together (set via APEX_S3_ACCESS_KEY_ID and APEX_S3_SECRET_ACCESS_KEY)")
}
if s3cfg.ListenAddr == "" {
return errors.New("s3.listen_addr is required when s3.enabled is true")
}
if storage.Type != "sqlite" && storage.Type != "" {
return errors.New("s3.enabled requires storage.type to be \"sqlite\"")
}
if s3cfg.Namespace != "" {
ns, err := types.NamespaceFromHex(s3cfg.Namespace)
if err != nil {
return fmt.Errorf("s3.namespace is invalid: %w", err)
}
if err := ns.ValidateForBlob(); err != nil {
return fmt.Errorf("s3.namespace is invalid: %w", err)
}
}
if submission.Enabled && s3cfg.Namespace == "" {
return errors.New("s3.namespace is required when both s3.enabled and submission.enabled are true")
}
return nil
}

func resolveSubmissionSignerKeyPath(s *SubmissionConfig, baseDir string) error {
if !s.Enabled {
return nil
Expand Down
Loading
Loading