15#ifndef ACTIONENGINE_DISTRIBUTED_GROUPCACHE_H_ 
   16#define ACTIONENGINE_DISTRIBUTED_GROUPCACHE_H_ 
   20#include <absl/base/nullability.h> 
   21#include <absl/functional/any_invocable.h> 
   22#include <absl/random/random.h> 
   25#include "actionengine/distributed/lru.h" 
   26#include "actionengine/distributed/peers.h" 
   27#include "actionengine/distributed/singleflight.h" 
   28#include "actionengine/distributed/sinks.h" 
   30namespace act::distributed {
 
   32using Getter = absl::AnyInvocable<absl::Status(std::string_view key,
 
   33                                               Sink* absl_nonnull sink)>;
 
   36  std::atomic<uint64_t> gets{0};
 
   37  std::atomic<uint64_t> cache_hits{0};
 
   38  std::atomic<uint64_t> peer_loads{0};
 
   39  std::atomic<uint64_t> peer_errors{0};
 
   40  std::atomic<uint64_t> loads{0};
 
   41  std::atomic<uint64_t> loads_deduped{0};
 
   42  std::atomic<uint64_t> local_loads{0};
 
   43  std::atomic<uint64_t> local_load_errors{0};
 
   44  std::atomic<uint64_t> server_requests{0};
 
   57  explicit Cache(
size_t max_entries = 0);
 
   59  void Add(std::string_view key, std::string_view value);
 
   61  const std::string* absl_nullable Get(std::string_view key);
 
   63  void Remove(std::string_view key);
 
   67  CacheStats Stats()
 const {
 
   68    act::MutexLock l(&mu_);
 
   69    return CacheStats{bytes_, lru_.Size(), gets_, hits_, evictions_};
 
   72  uint64_t Size()
 const {
 
   73    act::MutexLock l(&mu_);
 
   77  uint64_t Items()
 const {
 
   78    act::MutexLock l(&mu_);
 
   83  mutable act::Mutex mu_;
 
   84  LruCache lru_ ABSL_GUARDED_BY(mu_);
 
   86  uint64_t bytes_ ABSL_GUARDED_BY(mu_) = 0;
 
   87  uint64_t gets_ ABSL_GUARDED_BY(mu_) = 0;
 
   88  uint64_t hits_ ABSL_GUARDED_BY(mu_) = 0;
 
   89  uint64_t evictions_ ABSL_GUARDED_BY(mu_) = 0;
 
   94  Group(std::string_view name, 
size_t cache_bytes, Getter getter);
 
   96  std::string_view Name()
 const { 
return name_; }
 
   98  absl::StatusOr<std::string_view> LookupCache(std::string_view key);
 
  100  absl::StatusOr<std::string> Load(std::string_view key,
 
  101                                   Sink* absl_nonnull dest,
 
  102                                   bool* absl_nonnull dest_populated);
 
  105  absl::Status PopulateCache(std::string_view key, std::string_view value,
 
  106                             Cache* absl_nonnull cache);
 
  108  absl::StatusOr<std::string_view> GetLocally(std::string_view key,
 
  109                                              Sink* absl_nonnull dest);
 
  111  absl::StatusOr<std::string> GetFromPeer(ServiceGetter* absl_nonnull peer,
 
  112                                          std::string_view key);
 
  117  PeerPicker* absl_nullable peers_ = 
nullptr;
 
  121  uint64_t cache_bytes_ = std::numeric_limits<
 
  124  FlightGroup load_group_;
 
  130  absl::once_flag peers_init_once_;
 
  135  Group* absl_nonnull NewGroup(std::string_view name, uint64_t cache_bytes,
 
  137                               PeerPicker* absl_nullable peers = 
nullptr);
 
  139  Group* absl_nullable GetGroup(std::string_view name);
 
  142  mutable act::Mutex mu_;
 
  143  absl::flat_hash_map<std::string, std::unique_ptr<Group>> groups_
 
  144      ABSL_GUARDED_BY(mu_);
 
Concurrency utilities for ActionEngine.