Action Engine
Loading...
Searching...
No Matches
put.lua.h
1
// Copyright 2025 Google LLC
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
// http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
#ifndef ACTIONENGINE_REDIS_CHUNK_STORE_OPS_PUT_LUA_H_
16
#define ACTIONENGINE_REDIS_CHUNK_STORE_OPS_PUT_LUA_H_
17
18
#include <string_view>
19
20
#include "actionengine/redis/chunk_store_ops/unindent.h"
21
22
namespace
act::redis {
23
24
const
std::array<std::string, 8> kPutScriptKeys = {
25
"{}:s"
,
"{}:seq_to_id"
,
"{}:offset_to_seq"
,
"{}:offset_cursor"
,
26
"{}:final_seq"
,
"{}:status"
,
"{}:closed"
,
"{}:events"
};
27
28
constexpr
std::string_view kPutScriptCode = R
"(
29
-- put.lua
30
-- KEYS[1]: <id>:s
31
-- KEYS[2]: <id>:seq_to_id
32
-- KEYS[3]: <id>:offset_to_seq
33
-- KEYS[4]: <id>:offset_cursor
34
-- KEYS[5]: <id>:final_seq
35
-- KEYS[6]: <id>:status
36
-- KEYS[7]: <id>:closed
37
-- KEYS[8]: <id>:events
38
-- ARGV[1]: seq
39
-- ARGV[2]: data
40
-- ARGV[3]: final ('1' or '0')
41
-- ARGV[4]: ttl_seconds (0 or less means no expiration)
42
-- ARGV[5]: status_ttl_seconds (0 or less means no expiration)
43
44
local seq = ARGV[1]
45
local data = ARGV[2]
46
local is_final = ARGV[3]
47
local ttl_seconds = tonumber(ARGV[4])
48
local status_ttl_seconds = tonumber(ARGV[5])
49
50
-- Define all keys used by the abstraction
51
local stream_key = KEYS[1]
52
local seq_to_id_key = KEYS[2]
53
local offset_to_seq_key = KEYS[3]
54
local offset_cursor_key = KEYS[4]
55
local final_seq_key = KEYS[5]
56
local status_key = KEYS[6]
57
local closed_key = KEYS[7]
58
local events_key = KEYS[8]
59
60
-- --- Main Put Logic (from put_v2.lua) ---
61
if redis.call('EXISTS', closed_key) == 1 then
62
return {err = 'PUT_CLOSED'}
63
end
64
65
if redis.call('HEXISTS', seq_to_id_key, seq) == 1 then
66
return {err = 'SEQ_EXISTS'}
67
end
68
69
local offset = tonumber(redis.call('INCR', offset_cursor_key)) - 1
70
local id_in_stream = redis.call('XADD', stream_key, '*', 'seq', seq, 'data', data)
71
redis.call('HSET', seq_to_id_key, seq, id_in_stream)
72
redis.call('ZADD', offset_to_seq_key, offset, seq)
73
74
if is_final == '1' then
75
redis.call('SET', final_seq_key, seq)
76
end
77
-- --- End of Main Put Logic ---
78
79
-- --- TTL Application Logic ---
80
if ttl_seconds > 0 then
81
redis.call('EXPIRE', stream_key, ttl_seconds)
82
redis.call('EXPIRE', seq_to_id_key, ttl_seconds)
83
redis.call('EXPIRE', offset_to_seq_key, ttl_seconds)
84
redis.call('EXPIRE', offset_cursor_key, ttl_seconds)
85
redis.call('EXPIRE', status_key, status_ttl_seconds)
86
redis.call('EXPIRE', closed_key, status_ttl_seconds)
87
88
-- Also expire metadata keys if they were created
89
if is_final == '1' then
90
redis.call('EXPIRE', final_seq_key, ttl_seconds)
91
end
92
end
93
94
redis.call('PUBLISH', events_key, 'NEW:' .. offset .. ':' .. seq .. ':' .. id_in_stream)
95
96
return {offset, id_in_stream}
97
)"_unindent;
98
99
}
// namespace act::redis
100
101
#endif
// ACTIONENGINE_REDIS_CHUNK_STORE_OPS_PUT_LUA_H_
src
actionengine
redis
chunk_store_ops
put.lua.h
Generated by
1.13.2