Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 158 additions & 23 deletions backends/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ package backends
import (
"context"
"crypto/tls"
"reflect"
"strconv"
"time"

"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/prebid/prebid-cache/config"
"github.com/prebid/prebid-cache/utils"
log "github.com/sirupsen/logrus"
Expand All @@ -21,21 +22,35 @@ type RedisDB interface {
}

// RedisDBClient is a wrapper for the Redis client that implements
// the RedisDB interface
// the RedisDB interface. It can work with both standalone and cluster clients.
type RedisDBClient struct {
client *redis.Client
client interface{} // Can be either *redis.Client or *redis.ClusterClient
}

// Get returns the value associated with the provided `key` parameter
func (db RedisDBClient) Get(ctx context.Context, key string) (string, error) {
return db.client.Get(ctx, key).Result()
switch c := db.client.(type) {
case *redis.Client:
return c.Get(ctx, key).Result()
case *redis.ClusterClient:
return c.Get(ctx, key).Result()
default:
return "", utils.NewPBCError(utils.KEY_NOT_FOUND)
}
}

// Put will set 'key' to hold string 'value' if 'key' does not exist in the redis storage.
// When key already holds a value, no operation is performed. That's the reason this adapter
// uses the 'github.com/go-redis/redis's library SetNX. SetNX is short for "SET if Not eXists".
func (db RedisDBClient) Put(ctx context.Context, key, value string, ttlSeconds int) (bool, error) {
return db.client.SetNX(ctx, key, value, time.Duration(ttlSeconds)*time.Second).Result()
switch c := db.client.(type) {
case *redis.Client:
return c.SetNX(ctx, key, value, time.Duration(ttlSeconds)*time.Second).Result()
case *redis.ClusterClient:
return c.SetNX(ctx, key, value, time.Duration(ttlSeconds)*time.Second).Result()
default:
return false, utils.NewPBCError(utils.KEY_NOT_FOUND)
}
}

// RedisBackend when initialized will instantiate and configure the Redis client. It implements
Expand All @@ -47,42 +62,162 @@ type RedisBackend struct {

// NewRedisBackend initializes the redis client and pings to make sure connection was successful
func NewRedisBackend(cfg config.Redis, ctx context.Context) *RedisBackend {
constr := cfg.Host + ":" + strconv.Itoa(cfg.Port)
var redisClient RedisDBClient

options := &redis.Options{
Addr: constr,
Password: cfg.Password,
DB: cfg.Db,
}
if cfg.Cluster.Enabled {
// Cluster mode
clusterOptions := &redis.ClusterOptions{
Addrs: cfg.Cluster.Hosts,
Password: cfg.Password,
// Note: DB selection is not supported in cluster mode
}

// Apply performance configuration for cluster
applyClusterPerformanceOptions(clusterOptions, cfg)

if cfg.TLS.Enabled {
options = &redis.Options{
if cfg.TLS.Enabled {
clusterOptions.TLSConfig = &tls.Config{
InsecureSkipVerify: cfg.TLS.InsecureSkipVerify,
}
}

clusterClient := redis.NewClusterClient(clusterOptions)

// Test cluster connection
_, err := clusterClient.Ping(ctx).Result()
if err != nil {
log.Fatalf("Error creating Redis cluster backend: %v", err)
panic("RedisBackend failure. This shouldn't happen.")
}

log.Infof("Connected to Redis cluster with hosts: %v", cfg.Cluster.Hosts)
redisClient = RedisDBClient{client: clusterClient}
} else {
// Single-node mode
constr := cfg.Host + ":" + strconv.Itoa(cfg.Port)

options := &redis.Options{
Addr: constr,
Password: cfg.Password,
DB: cfg.Db,
TLSConfig: &tls.Config{
}

// Apply performance configuration for single-node
applySingleNodePerformanceOptions(options, cfg)

if cfg.TLS.Enabled {
options.TLSConfig = &tls.Config{
InsecureSkipVerify: cfg.TLS.InsecureSkipVerify,
},
}
}
}

redisClient := RedisDBClient{client: redis.NewClient(options)}
singleClient := redis.NewClient(options)

_, err := redisClient.client.Ping(ctx).Result()
// Test single-node connection
_, err := singleClient.Ping(ctx).Result()
if err != nil {
log.Fatalf("Error creating Redis backend: %v", err)
panic("RedisBackend failure. This shouldn't happen.")
}

if err != nil {
log.Fatalf("Error creating Redis backend: %v", err)
panic("RedisBackend failure. This shouldn't happen.")
log.Infof("Connected to Redis at %s:%d", cfg.Host, cfg.Port)
redisClient = RedisDBClient{client: singleClient}
}

log.Infof("Connected to Redis at %s:%d", cfg.Host, cfg.Port)

return &RedisBackend{
cfg: cfg,
client: redisClient,
}
}

// applyPerformanceOptions applies performance tuning options to both single-node and cluster Redis clients
// This function uses reflection to set common fields on both redis.Options and redis.ClusterOptions
func applyPerformanceOptions(options interface{}, cfg config.Redis) {
optionsValue := reflect.ValueOf(options).Elem()

// Apply pool configuration
if cfg.Pool != nil {
if cfg.Pool.Size > 0 {
if field := optionsValue.FieldByName("PoolSize"); field.IsValid() && field.CanSet() {
field.SetInt(int64(cfg.Pool.Size))
}
}
if cfg.Pool.Timeout > 0 {
if field := optionsValue.FieldByName("PoolTimeout"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Pool.Timeout))
}
}
if cfg.Pool.MinIdleConns > 0 {
if field := optionsValue.FieldByName("MinIdleConns"); field.IsValid() && field.CanSet() {
field.SetInt(int64(cfg.Pool.MinIdleConns))
}
}
if cfg.Pool.MaxIdleConns > 0 {
if field := optionsValue.FieldByName("MaxIdleConns"); field.IsValid() && field.CanSet() {
field.SetInt(int64(cfg.Pool.MaxIdleConns))
}
}
if cfg.Pool.ConnMaxIdleTime > 0 {
if field := optionsValue.FieldByName("ConnMaxIdleTime"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Pool.ConnMaxIdleTime))
}
}
if cfg.Pool.ConnMaxLifetime > 0 {
if field := optionsValue.FieldByName("ConnMaxLifetime"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Pool.ConnMaxLifetime))
}
}
}

// Apply timeout configuration
if cfg.Timeouts != nil {
if cfg.Timeouts.DialTimeout > 0 {
if field := optionsValue.FieldByName("DialTimeout"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Timeouts.DialTimeout))
}
}
if cfg.Timeouts.ReadTimeout > 0 {
if field := optionsValue.FieldByName("ReadTimeout"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Timeouts.ReadTimeout))
}
}
if cfg.Timeouts.WriteTimeout > 0 {
if field := optionsValue.FieldByName("WriteTimeout"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Timeouts.WriteTimeout))
}
}
}

// Apply retry configuration
if cfg.Retry != nil {
if cfg.Retry.MaxRetries >= 0 {
if field := optionsValue.FieldByName("MaxRetries"); field.IsValid() && field.CanSet() {
field.SetInt(int64(cfg.Retry.MaxRetries))
}
}
if cfg.Retry.MinRetryBackoff > 0 {
if field := optionsValue.FieldByName("MinRetryBackoff"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Retry.MinRetryBackoff))
}
}
if cfg.Retry.MaxRetryBackoff > 0 {
if field := optionsValue.FieldByName("MaxRetryBackoff"); field.IsValid() && field.CanSet() {
field.Set(reflect.ValueOf(cfg.Retry.MaxRetryBackoff))
}
}
}
}

// applySingleNodePerformanceOptions applies performance tuning options to single-node Redis client
func applySingleNodePerformanceOptions(options *redis.Options, cfg config.Redis) {
applyPerformanceOptions(options, cfg)
}

// applyClusterPerformanceOptions applies performance tuning options to cluster Redis client
func applyClusterPerformanceOptions(options *redis.ClusterOptions, cfg config.Redis) {
applyPerformanceOptions(options, cfg)
}

// Get calls the Redis client to return the value associated with the provided `key`
// parameter and interprets its response. A `Nil` error reply of the Redis client means
// the `key` does not exist.
Expand Down
2 changes: 1 addition & 1 deletion backends/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"testing"

"github.com/go-redis/redis/v8"
"github.com/redis/go-redis/v9"
"github.com/prebid/prebid-cache/utils"
"github.com/stretchr/testify/assert"
)
Expand Down
43 changes: 40 additions & 3 deletions config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,41 @@ request_limits:
request_logging:
referer_sampling_rate: 0.0
backend:
type: "memory" # Switch to be "aerospike", "cassandra", "memcache", "ignite" or "redis" for production.
type: memory

# Redis backend configuration (when type: redis)
redis:
host: localhost
port: 6379
password: ""
db: 0

# Optional: Redis performance tuning (uncomment to use)
# pool:
# size: 100 # Connection pool size (0 = default: 10 * GOMAXPROCS)
# timeout: 5s # Max time to wait for connection from pool
# min_idle_conns: 10 # Minimum idle connections to maintain
# max_idle_conns: 20 # Maximum idle connections
# conn_max_idle_time: 30m # Close connections after remaining idle
# conn_max_lifetime: 60m # Close connections after this lifetime

# timeouts:
# dial_timeout: 5s # Timeout for establishing connections
# read_timeout: 3s # Timeout for socket reads
# write_timeout: 3s # Timeout for socket writes

# retry:
# max_retries: 3 # Maximum number of retries
# min_retry_backoff: 8ms # Minimum backoff between retries
# max_retry_backoff: 512ms # Maximum backoff between retries

tls:
enabled: false
insecure_skip_verify: false

cluster:
enabled: false
hosts: []
# aerospike:
# hosts: [ "aerospike.prebid.com" ]
# port: 3000
Expand All @@ -24,7 +58,7 @@ backend:
# memcache:
# config_host: "" # Configuration endpoint for auto discovery. Replaced at docker build.
# poll_interval_seconds: 30 # Node change polling interval when auto discovery is used
# hosts: "10.0.0.1:11211" # List of nodes when not using auto discovery. Can also use an array for multiple hosts.
# hosts: "10.0.0.1:11211" # List of nodes when not using auto discovery. Can also use an array for multiple hosts.
# redis:
# host: "127.0.0.1"
# port: 6379
Expand All @@ -34,6 +68,9 @@ backend:
# tls:
# enabled: false
# insecure_skip_verify: false
# cluster:
# enabled: false
# hosts: ["redis-node1:6379", "redis-node2:6379", "redis-node3:6379"]
# ignite:
# scheme: "http"
# host: "127.0.0.1"
Expand All @@ -47,7 +84,7 @@ backend:
compression:
type: "snappy" # Can also be "none"
metrics:
type: "none" # Can also be "influx"
type: "none" # Can also be "influx"
influx:
host: "http://influx.prebid.com"
database: "some-database"
Expand Down
Loading