You've already forked orderservice
chore: project refactor
This commit is contained in:
@@ -3,6 +3,7 @@ package config
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
|
||||
@@ -33,7 +34,7 @@ func Load() (*Config, error) {
|
||||
HTTPPort: mustGetInt("HTTP_PORT", 8080), //nolint:mnd // false-positive
|
||||
LogLevel: getEnv("LOG_LEVEL", "info"),
|
||||
DBHost: getEnv("POSTGRES_HOST", "localhost"),
|
||||
DBPort: mustGetInt("POSTGRES_PORT", 5432),
|
||||
DBPort: mustGetInt("POSTGRES_PORT", 5432), //nolint:mnd // false-positive
|
||||
DBUser: getEnv("POSTGRES_USERNAME", "postgres"),
|
||||
DBPassword: getEnv("POSTGRES_PASSWORD", "postgres"),
|
||||
DBName: getEnv("POSTGRES_DATABASE", "postgres"),
|
||||
@@ -66,7 +67,12 @@ func mustGetBool(key string, def bool) bool {
|
||||
return b
|
||||
}
|
||||
|
||||
func (c Config) BuildDsn() string {
|
||||
func (c Config) BuildPostgresConnStr() string {
|
||||
return fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable",
|
||||
c.DBHost, c.DBPort, c.DBUser, c.DBPassword, c.DBName)
|
||||
}
|
||||
|
||||
func (c Config) BuildPostgresDSN() string {
|
||||
return fmt.Sprintf("postgresql://%s:%s@%s/%s?sslmode=disable",
|
||||
c.DBUser, c.DBPassword, net.JoinHostPort(c.DBHost, strconv.Itoa(c.DBPort)), c.DBName)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
package domain
|
||||
|
||||
import (
|
||||
"errors"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidID = errors.New("invalid uuid")
|
||||
)
|
||||
@@ -17,7 +17,7 @@ func mapError(err error) error {
|
||||
if errors.Is(err, domain.ErrOrderAlreadyExist) {
|
||||
return status.Error(codes.AlreadyExists, err.Error())
|
||||
}
|
||||
if errors.Is(err, domain.ErrInvalidOrderData) {
|
||||
if errors.Is(err, domain.ErrInvalidOrderData) || errors.Is(err, domain.ErrInvalidID) {
|
||||
return status.Error(codes.InvalidArgument, err.Error())
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@ import (
|
||||
"orderservice/internal/domain"
|
||||
"orderservice/internal/service"
|
||||
pb "orderservice/pkg/api/order"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type OrderHandler struct {
|
||||
@@ -41,7 +43,12 @@ func (h *OrderHandler) CreateOrder(
|
||||
}
|
||||
|
||||
func (h *OrderHandler) GetOrder(ctx context.Context, req *pb.GetOrderRequest) (*pb.GetOrderResponse, error) {
|
||||
order, err := h.service.Get(ctx, req.GetId())
|
||||
parsedID, err := uuid.Parse(req.GetId())
|
||||
if err != nil {
|
||||
return nil, domain.ErrInvalidID
|
||||
}
|
||||
|
||||
order, err := h.service.Get(ctx, parsedID)
|
||||
if err != nil {
|
||||
return nil, mapError(err)
|
||||
}
|
||||
@@ -53,7 +60,12 @@ func (h *OrderHandler) UpdateOrder(
|
||||
ctx context.Context,
|
||||
req *pb.UpdateOrderRequest,
|
||||
) (*pb.UpdateOrderResponse, error) {
|
||||
order, err := h.service.Update(ctx, req.GetId(), req.GetItem(), req.GetQuantity())
|
||||
parsedID, err := uuid.Parse(req.GetId())
|
||||
if err != nil {
|
||||
return nil, domain.ErrInvalidID
|
||||
}
|
||||
|
||||
order, err := h.service.Update(ctx, parsedID, req.GetItem(), req.GetQuantity())
|
||||
if err != nil {
|
||||
return nil, mapError(err)
|
||||
}
|
||||
@@ -65,7 +77,12 @@ func (h *OrderHandler) DeleteOrder(
|
||||
ctx context.Context,
|
||||
req *pb.DeleteOrderRequest,
|
||||
) (*pb.DeleteOrderResponse, error) {
|
||||
err := h.service.Delete(ctx, req.GetId())
|
||||
parsedID, err := uuid.Parse(req.GetId())
|
||||
if err != nil {
|
||||
return nil, domain.ErrInvalidID
|
||||
}
|
||||
|
||||
err = h.service.Delete(ctx, parsedID)
|
||||
if err != nil {
|
||||
return nil, mapError(err)
|
||||
}
|
||||
|
||||
@@ -32,13 +32,13 @@ func (i *LoggerInterceptor) Unary() grpc.UnaryServerInterceptor {
|
||||
|
||||
if err != nil {
|
||||
if st, ok := status.FromError(err); ok {
|
||||
log.Printf("Error: %s, Code: %s, Duration: %v",
|
||||
log.Printf("error: %s, code: %s, duration: %v",
|
||||
st.Message(), st.Code(), duration)
|
||||
} else {
|
||||
log.Printf("Error: %v, Duration: %v", err, duration)
|
||||
log.Printf("error: %v, duration: %v", err, duration)
|
||||
}
|
||||
} else {
|
||||
log.Printf("Method %s completed in %v", info.FullMethod, duration)
|
||||
log.Printf("method %s completed in %v", info.FullMethod, duration)
|
||||
}
|
||||
|
||||
return resp, err
|
||||
@@ -61,10 +61,10 @@ func (i *LoggerInterceptor) Stream() grpc.StreamServerInterceptor {
|
||||
duration := time.Since(start)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("Stream method %s failed: %v, Duration: %v",
|
||||
log.Printf("stream method %s failed: %v, duration: %v",
|
||||
info.FullMethod, err, duration)
|
||||
} else {
|
||||
log.Printf("Stream method %s completed in %v",
|
||||
log.Printf("stream method %s completed in %v",
|
||||
info.FullMethod, duration)
|
||||
}
|
||||
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"sync"
|
||||
|
||||
"orderservice/internal/domain"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type OrderRepository struct {
|
||||
@@ -34,7 +36,7 @@ func (r *OrderRepository) Create(ctx context.Context, order *domain.Order) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *OrderRepository) Get(ctx context.Context, id string) (*domain.Order, error) {
|
||||
func (r *OrderRepository) Get(ctx context.Context, id uuid.UUID) (*domain.Order, error) {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -42,7 +44,7 @@ func (r *OrderRepository) Get(ctx context.Context, id string) (*domain.Order, er
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
order, ok := r.orders[id]
|
||||
order, ok := r.orders[id.String()]
|
||||
if !ok {
|
||||
return nil, domain.ErrOrderNotFound
|
||||
}
|
||||
@@ -66,7 +68,7 @@ func (r *OrderRepository) Update(ctx context.Context, order *domain.Order) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *OrderRepository) Delete(ctx context.Context, id string) error {
|
||||
func (r *OrderRepository) Delete(ctx context.Context, id uuid.UUID) error {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -74,11 +76,11 @@ func (r *OrderRepository) Delete(ctx context.Context, id string) error {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
if _, ok := r.orders[id]; !ok {
|
||||
if _, ok := r.orders[id.String()]; !ok {
|
||||
return domain.ErrOrderNotFound
|
||||
}
|
||||
|
||||
delete(r.orders, id)
|
||||
delete(r.orders, id.String())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -4,12 +4,14 @@ import (
|
||||
"context"
|
||||
|
||||
"orderservice/internal/domain"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type OrderRepository interface {
|
||||
Create(ctx context.Context, order *domain.Order) error
|
||||
Get(ctx context.Context, id string) (*domain.Order, error)
|
||||
Get(ctx context.Context, id uuid.UUID) (*domain.Order, error)
|
||||
Update(ctx context.Context, order *domain.Order) error
|
||||
Delete(ctx context.Context, id string) error
|
||||
Delete(ctx context.Context, id uuid.UUID) error
|
||||
List(ctx context.Context) ([]*domain.Order, error)
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@ package postgres
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
_ "embed"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
@@ -12,24 +11,19 @@ import (
|
||||
|
||||
"orderservice/internal/domain"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq" // postgres driver
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
//go:embed schema.sql
|
||||
var Schema string
|
||||
|
||||
const (
|
||||
orderCachePrefix = "order:"
|
||||
cacheTTL = 5 * time.Minute
|
||||
maxCacheRetries = 2
|
||||
cacheRetryDelay = 100 * time.Millisecond
|
||||
)
|
||||
|
||||
type OrderRepository struct {
|
||||
db *sqlx.DB
|
||||
cache *redis.Client
|
||||
redisClient *redis.Client
|
||||
cacheEnable bool
|
||||
}
|
||||
|
||||
@@ -46,7 +40,7 @@ func NewOrderRepository(db *sqlx.DB, redisClient *redis.Client, config *Config)
|
||||
|
||||
return &OrderRepository{
|
||||
db: db,
|
||||
cache: redisClient,
|
||||
redisClient: redisClient,
|
||||
cacheEnable: config.CacheEnable,
|
||||
}
|
||||
}
|
||||
@@ -77,19 +71,19 @@ func (r *OrderRepository) Create(ctx context.Context, order *domain.Order) error
|
||||
|
||||
if r.cacheEnable {
|
||||
if err := r.setCacheWithRetry(ctx, order); err != nil {
|
||||
log.Printf("WARN: cache set error for order %s: %v", order.ID, err)
|
||||
log.Printf("warn: cache set error for order %s: %v", order.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *OrderRepository) Get(ctx context.Context, id string) (*domain.Order, error) {
|
||||
func (r *OrderRepository) Get(ctx context.Context, id uuid.UUID) (*domain.Order, error) {
|
||||
if r.cacheEnable {
|
||||
if order, err := r.getFromCache(ctx, id); err == nil {
|
||||
if order, err := r.getFromCache(ctx, id.String()); err == nil {
|
||||
return order, nil
|
||||
} else if !errors.Is(err, redis.Nil) {
|
||||
log.Printf("WARN: cache get error for order %s: %v", id, err)
|
||||
log.Printf("warn: cache get error for order %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -109,7 +103,7 @@ func (r *OrderRepository) Get(ctx context.Context, id string) (*domain.Order, er
|
||||
|
||||
if r.cacheEnable {
|
||||
if err := r.setCacheWithRetry(ctx, &order); err != nil {
|
||||
log.Printf("WARN: cache set error for order %s: %v", id, err)
|
||||
log.Printf("warn: cache set error for order %s: %v", id, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -149,14 +143,14 @@ func (r *OrderRepository) Update(ctx context.Context, order *domain.Order) error
|
||||
|
||||
if r.cacheEnable {
|
||||
if err := r.setCacheWithRetry(ctx, order); err != nil {
|
||||
log.Printf("WARN: cache set error for order %s: %v", order.ID, err)
|
||||
log.Printf("warn: cache set error for order %s: %v", order.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *OrderRepository) Delete(ctx context.Context, id string) error {
|
||||
func (r *OrderRepository) Delete(ctx context.Context, id uuid.UUID) error {
|
||||
tx, err := r.db.BeginTxx(ctx, nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("begin transaction: %w", err)
|
||||
@@ -186,7 +180,7 @@ func (r *OrderRepository) Delete(ctx context.Context, id string) error {
|
||||
return fmt.Errorf("commit transaction: %w", err)
|
||||
}
|
||||
|
||||
r.invalidateCache(ctx, id)
|
||||
r.invalidateCache(ctx, id.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -206,14 +200,14 @@ func (r *OrderRepository) List(ctx context.Context) ([]*domain.Order, error) {
|
||||
}
|
||||
|
||||
func (r *OrderRepository) getFromCache(ctx context.Context, id string) (*domain.Order, error) {
|
||||
data, err := r.cache.Get(ctx, r.cacheKey(id)).Bytes()
|
||||
data, err := r.redisClient.Get(ctx, r.cacheKey(id)).Bytes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var order domain.Order
|
||||
if err := json.Unmarshal(data, &order); err != nil {
|
||||
r.cache.Del(ctx, r.cacheKey(id))
|
||||
r.redisClient.Del(ctx, r.cacheKey(id))
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -228,16 +222,7 @@ func (r *OrderRepository) setCacheWithRetry(ctx context.Context, order *domain.O
|
||||
|
||||
key := r.cacheKey(order.ID.String())
|
||||
|
||||
for i := range maxCacheRetries {
|
||||
err = r.cache.Set(ctx, key, data, cacheTTL).Err()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if i < maxCacheRetries-1 {
|
||||
time.Sleep(cacheRetryDelay)
|
||||
}
|
||||
}
|
||||
err = r.redisClient.Set(ctx, key, data, cacheTTL).Err()
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -248,11 +233,11 @@ func (r *OrderRepository) invalidateCache(_ context.Context, id string) {
|
||||
}
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), r.redisClient.Options().ReadTimeout)
|
||||
defer cancel()
|
||||
|
||||
if err := r.cache.Del(ctx, r.cacheKey(id)).Err(); err != nil {
|
||||
log.Printf("WARN: cache invalidation failed for order %s: %v", id, err)
|
||||
if err := r.redisClient.Del(ctx, r.cacheKey(id)).Err(); err != nil {
|
||||
log.Printf("warn: cache invalidation failed for order %s: %v", id, err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
+35
-11
@@ -6,6 +6,7 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"orderservice/internal/config"
|
||||
"orderservice/internal/interceptor"
|
||||
@@ -19,12 +20,25 @@ import (
|
||||
|
||||
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
|
||||
"github.com/jmoiron/sqlx"
|
||||
_ "github.com/lib/pq" // postgres driver
|
||||
"github.com/redis/go-redis/v9"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/reflection"
|
||||
)
|
||||
|
||||
const (
|
||||
httpReadTimeout = 10 * time.Second
|
||||
httpWriteTimeout = 10 * time.Second
|
||||
httpIdleTimeout = 60 * time.Second
|
||||
redisRetryCount = 2
|
||||
redisMinRetryBackoff = 50 * time.Millisecond
|
||||
redisMaxRetryBackoff = 200 * time.Millisecond
|
||||
redisDialTimeout = 1 * time.Second
|
||||
redisDialerRetries = 3
|
||||
redisTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
grpcServer *grpc.Server
|
||||
config *config.Config
|
||||
@@ -62,28 +76,38 @@ func runHTTPHandler(s *Server, grpcServerEndpoint *string) error {
|
||||
mux.Handle("/healthz", httpHandlers.NewHealthHandler(s.db, s.redisDB))
|
||||
mux.Handle("/", gwmux)
|
||||
|
||||
addr := fmt.Sprintf(":%d", s.config.HTTPPort)
|
||||
return http.ListenAndServe(addr, mux)
|
||||
srv := &http.Server{
|
||||
Addr: fmt.Sprintf(":%d", s.config.HTTPPort),
|
||||
Handler: mux,
|
||||
ReadTimeout: httpReadTimeout,
|
||||
WriteTimeout: httpWriteTimeout,
|
||||
IdleTimeout: httpIdleTimeout,
|
||||
}
|
||||
|
||||
return srv.ListenAndServe()
|
||||
}
|
||||
|
||||
func getDatabase(cfg config.Config) (*sqlx.DB, error) {
|
||||
db, err := sqlx.Connect("postgres", cfg.BuildDsn())
|
||||
db, err := sqlx.Connect("postgres", cfg.BuildPostgresConnStr())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("connect to database: %w", err)
|
||||
}
|
||||
|
||||
_, err = db.Exec(orderPostgresRepo.Schema)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("run schema: %w", err)
|
||||
}
|
||||
|
||||
return db, nil
|
||||
}
|
||||
|
||||
func getRedis(cfg config.Config) (*redis.Client, error) {
|
||||
conn, err := redis.ParseURL(cfg.RedisURI)
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: conn.Addr,
|
||||
Addr: conn.Addr,
|
||||
MaxRetries: redisRetryCount,
|
||||
MinRetryBackoff: redisMinRetryBackoff,
|
||||
MaxRetryBackoff: redisMaxRetryBackoff,
|
||||
DialTimeout: redisDialTimeout,
|
||||
DialerRetries: redisDialerRetries,
|
||||
DialerRetryTimeout: redisDialTimeout,
|
||||
ReadTimeout: redisTimeout,
|
||||
WriteTimeout: redisTimeout,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("parse Redis URI: %w", err)
|
||||
@@ -131,14 +155,14 @@ func (s *Server) Start() error {
|
||||
|
||||
if s.config.EnableHTTPHandler {
|
||||
go func() {
|
||||
log.Printf("Starting HTTP gateway on port %d", s.config.HTTPPort)
|
||||
log.Printf("starting HTTP gateway on port %d", s.config.HTTPPort)
|
||||
if err := runHTTPHandler(s, &addr); err != nil {
|
||||
log.Printf("HTTP gateway failed: %v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
log.Printf("Starting gRPC server on port %d", s.config.GRPCPort)
|
||||
log.Printf("starting gRPC server on port %d", s.config.GRPCPort)
|
||||
|
||||
if err := s.grpcServer.Serve(lis); err != nil {
|
||||
return fmt.Errorf("failed to serve: %w", err)
|
||||
|
||||
@@ -37,11 +37,11 @@ func (s *OrderService) Create(ctx context.Context, item string, quantity int32)
|
||||
return order, nil
|
||||
}
|
||||
|
||||
func (s *OrderService) Get(ctx context.Context, id string) (*domain.Order, error) {
|
||||
func (s *OrderService) Get(ctx context.Context, id uuid.UUID) (*domain.Order, error) {
|
||||
return s.repo.Get(ctx, id)
|
||||
}
|
||||
|
||||
func (s *OrderService) Update(ctx context.Context, id string, item string, quantity int32) (*domain.Order, error) {
|
||||
func (s *OrderService) Update(ctx context.Context, id uuid.UUID, item string, quantity int32) (*domain.Order, error) {
|
||||
order, err := s.repo.Get(ctx, id)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -61,7 +61,7 @@ func (s *OrderService) Update(ctx context.Context, id string, item string, quant
|
||||
return order, nil
|
||||
}
|
||||
|
||||
func (s *OrderService) Delete(ctx context.Context, id string) error {
|
||||
func (s *OrderService) Delete(ctx context.Context, id uuid.UUID) error {
|
||||
return s.repo.Delete(ctx, id)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user