Action Engine
Loading...
Searching...
No Matches
put.lua.h
1// Copyright 2025 Google LLC
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#ifndef ACTIONENGINE_REDIS_CHUNK_STORE_OPS_PUT_LUA_H_
16#define ACTIONENGINE_REDIS_CHUNK_STORE_OPS_PUT_LUA_H_
17
18#include <string_view>
19
20#include "actionengine/redis/chunk_store_ops/unindent.h"
21
22namespace act::redis {
23
24const std::array<std::string, 8> kPutScriptKeys = {
25 "{}:s", "{}:seq_to_id", "{}:offset_to_seq", "{}:offset_cursor",
26 "{}:final_seq", "{}:status", "{}:closed", "{}:events"};
27
28constexpr std::string_view kPutScriptCode = R"(
29 -- put.lua
30 -- KEYS[1]: <id>:s
31 -- KEYS[2]: <id>:seq_to_id
32 -- KEYS[3]: <id>:offset_to_seq
33 -- KEYS[4]: <id>:offset_cursor
34 -- KEYS[5]: <id>:final_seq
35 -- KEYS[6]: <id>:status
36 -- KEYS[7]: <id>:closed
37 -- KEYS[8]: <id>:events
38 -- ARGV[1]: seq
39 -- ARGV[2]: data
40 -- ARGV[3]: final ('1' or '0')
41 -- ARGV[4]: ttl_seconds (0 or less means no expiration)
42 -- ARGV[5]: status_ttl_seconds (0 or less means no expiration)
43
44 local seq = ARGV[1]
45 local data = ARGV[2]
46 local is_final = ARGV[3]
47 local ttl_seconds = tonumber(ARGV[4])
48 local status_ttl_seconds = tonumber(ARGV[5])
49
50 -- Define all keys used by the abstraction
51 local stream_key = KEYS[1]
52 local seq_to_id_key = KEYS[2]
53 local offset_to_seq_key = KEYS[3]
54 local offset_cursor_key = KEYS[4]
55 local final_seq_key = KEYS[5]
56 local status_key = KEYS[6]
57 local closed_key = KEYS[7]
58 local events_key = KEYS[8]
59
60 -- --- Main Put Logic (from put_v2.lua) ---
61 if redis.call('EXISTS', closed_key) == 1 then
62 return {err = 'PUT_CLOSED'}
63 end
64
65 if redis.call('HEXISTS', seq_to_id_key, seq) == 1 then
66 return {err = 'SEQ_EXISTS'}
67 end
68
69 local offset = tonumber(redis.call('INCR', offset_cursor_key)) - 1
70 local id_in_stream = redis.call('XADD', stream_key, '*', 'seq', seq, 'data', data)
71 redis.call('HSET', seq_to_id_key, seq, id_in_stream)
72 redis.call('ZADD', offset_to_seq_key, offset, seq)
73
74 if is_final == '1' then
75 redis.call('SET', final_seq_key, seq)
76 end
77 -- --- End of Main Put Logic ---
78
79 -- --- TTL Application Logic ---
80 if ttl_seconds > 0 then
81 redis.call('EXPIRE', stream_key, ttl_seconds)
82 redis.call('EXPIRE', seq_to_id_key, ttl_seconds)
83 redis.call('EXPIRE', offset_to_seq_key, ttl_seconds)
84 redis.call('EXPIRE', offset_cursor_key, ttl_seconds)
85 redis.call('EXPIRE', status_key, status_ttl_seconds)
86 redis.call('EXPIRE', closed_key, status_ttl_seconds)
87
88 -- Also expire metadata keys if they were created
89 if is_final == '1' then
90 redis.call('EXPIRE', final_seq_key, ttl_seconds)
91 end
92 end
93
94 redis.call('PUBLISH', events_key, 'NEW:' .. offset .. ':' .. seq .. ':' .. id_in_stream)
95
96 return {offset, id_in_stream}
97)"_unindent;
98
99} // namespace act::redis
100
101#endif // ACTIONENGINE_REDIS_CHUNK_STORE_OPS_PUT_LUA_H_