15#ifndef ACTIONENGINE_SERVICE_SESSION_H_ 
   16#define ACTIONENGINE_SERVICE_SESSION_H_ 
   21#include <absl/base/nullability.h> 
   22#include <absl/base/thread_annotations.h> 
   23#include <absl/container/flat_hash_map.h> 
   24#include <absl/status/status.h> 
   25#include <absl/time/time.h> 
   31#include "actionengine/net/stream.h" 
   40  static constexpr absl::Duration kFiberCancellationTimeout = absl::Seconds(3);
 
   41  static constexpr absl::Duration kActionDetachTimeout = absl::Seconds(10);
 
   43  ActionContext() = 
default;
 
   46  absl::Status Dispatch(std::shared_ptr<Action> action);
 
   50  void WaitForActionsToDetach(
 
   51      absl::Duration cancel_timeout = kFiberCancellationTimeout,
 
   52      absl::Duration detach_timeout = kActionDetachTimeout);
 
   55  void CancelContextInternal() ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   57  std::unique_ptr<thread::Fiber> ExtractActionFiber(Action* absl_nonnull action)
 
   58      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   60  void WaitForActionsToDetachInternal(
 
   61      absl::Duration cancel_timeout = kFiberCancellationTimeout,
 
   62      absl::Duration detach_timeout = kActionDetachTimeout)
 
   63      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
   66  absl::flat_hash_map<Action*, std::unique_ptr<thread::Fiber>> running_actions_
 
   68  bool cancelled_ ABSL_GUARDED_BY(mu_) = false;
 
   69  act::CondVar cv_ ABSL_GUARDED_BY(mu_);
 
   72struct StreamDispatchTask {
 
   73  std::shared_ptr<WireStream> stream;
 
   74  std::unique_ptr<thread::Fiber> fiber;
 
   76  thread::PermanentEvent done;
 
  103                   ChunkStoreFactory chunk_store_factory = {});
 
  110  [[nodiscard]] 
AsyncNode* absl_nonnull GetNode(
 
  112      const ChunkStoreFactory& chunk_store_factory = {}) 
const;
 
  114  void DispatchFrom(
const std::shared_ptr<WireStream>& stream,
 
  115                    absl::AnyInvocable<
void()> on_done = {});
 
  119  void StopDispatchingFrom(
WireStream* absl_nonnull stream);
 
  120  void StopDispatchingFromAll();
 
  122  [[nodiscard]] absl::Duration GetRecvTimeout()
 const { 
return recv_timeout_; }
 
  124  [[nodiscard]] 
NodeMap* absl_nullable GetNodeMap()
 const { 
return node_map_; }
 
  126  [[nodiscard]] 
ActionRegistry* absl_nullable GetActionRegistry()
 const {
 
  127    return action_registry_;
 
  130  void SetActionRegistry(
ActionRegistry* absl_nullable action_registry) {
 
  131    action_registry_ = action_registry;
 
  135  void JoinDispatchers(
bool cancel = 
false) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_);
 
  138  bool joined_ ABSL_GUARDED_BY(mu_) = 
false;
 
  139  absl::flat_hash_map<WireStream*, std::unique_ptr<thread::Fiber>>
 
  140      dispatch_tasks_ ABSL_GUARDED_BY(mu_){};
 
  141  const absl::Duration recv_timeout_ = absl::Seconds(3600000);
 
  143  NodeMap* absl_nonnull 
const node_map_;
 
  145  ChunkStoreFactory chunk_store_factory_;
 
  147  std::unique_ptr<ActionContext> action_context_ = 
nullptr;
 
 
An interface for ActionEngine Action launch helper / handler context.
Concurrency utilities for ActionEngine.
Provides the AsyncNode class for handling asynchronous data streams.
Definition async_node.h:70
Session(NodeMap *absl_nonnull node_map, ActionRegistry *absl_nullable action_registry=nullptr, ChunkStoreFactory chunk_store_factory={})
Definition session.cc:143
Provides the NodeMap class for managing ActionEngine nodes.
A registry for ActionEngine actions.
An abstract interface for raw data storage and retrieval for ActionEngine nodes.
ActionEngine data structures used to implement actions and nodes (data streams).