-
-
Notifications
You must be signed in to change notification settings - Fork 19
/
pg_stats.sql
240 lines (214 loc) · 7.36 KB
/
pg_stats.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
-- Gauges
--
-- For values that change over time (seeders, leechers)
CREATE TABLE gauges (
"kind" TEXT NOT NULL,
"time" TIMESTAMP NOT NULL,
"info_hash" BYTEA,
"value" BIGINT DEFAULT 1
);
CREATE INDEX gauges_kind_info_hash_time_value ON gauges ("kind","info_hash","time","value");
CREATE OR REPLACE FUNCTION set_gauge(
"e_kind" TEXT,
"e_info_hash" BYTEA,
"e_value" BIGINT
) RETURNS void AS $$
DECLARE
"prev_value" BIGINT;
BEGIN
-- Deduplication:
SELECT "value" INTO "prev_value"
FROM gauges
WHERE "kind"="e_kind"
AND "info_hash"="e_info_hash"
ORDER BY "time" DESC
LIMIT 1;
IF prev_value IS NULL OR prev_value != e_value THEN
-- Add new datum:
INSERT INTO gauges
("kind", "time", "info_hash", "value")
VALUES (e_kind, NOW(), e_info_hash, e_value);
END IF;
END;
$$ LANGUAGE plpgsql;
-- Counters
--
-- For adding values (up, down, up_seeder)
CREATE TABLE counters (
"kind" TEXT NOT NULL,
"time" TIMESTAMP NOT NULL,
"info_hash" BYTEA,
"value" BIGINT NOT NULL
);
CREATE INDEX counters_kind_info_hash_time_value ON counters ("kind","info_hash","time","value");
CREATE INDEX counters_get
ON counters ("info_hash", "time")
-- Must be mentioned explicitly to use this index
WHERE info_hash LIKE 'GET /%.torrent';
CREATE OR REPLACE FUNCTION add_counter(
"e_kind" TEXT,
"e_info_hash" BYTEA,
"e_value" BIGINT
) RETURNS void AS $$
DECLARE
period_length BIGINT := 600;
-- TODO: reuse align_timestamp()
period TIMESTAMP := TO_TIMESTAMP(
FLOOR(
EXTRACT(EPOCH FROM NOW())
/ period_length)
* period_length);
BEGIN
IF e_value = 0 THEN
-- Nothing to do
RETURN;
END IF;
UPDATE counters SET "value"="value"+e_value
WHERE "kind"="e_kind"
AND "info_hash"="e_info_hash"
AND "time"="period";
IF NOT FOUND THEN
INSERT INTO counters
("kind", "time", "info_hash", "value")
VALUES (e_kind, period, e_info_hash, e_value);
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION align_timestamp (
"ts" TIMESTAMP,
"interval" INT
) RETURNS TIMESTAMP WITH TIME ZONE AS $$
SELECT TO_TIMESTAMP(FLOOR(EXTRACT(EPOCH FROM $1) / $2) * $2);
$$ LANGUAGE SQL IMMUTABLE;
CREATE TABLE downloaded_stats (
"info_hash" BYTEA PRIMARY KEY,
downloaded BIGINT,
downloaded30 BIGINT,
downloaded7 BIGINT,
downloaded1 BIGINT
);
CREATE INDEX downloaded_stats_downloaded ON downloaded_stats (downloaded DESC);
CREATE INDEX downloaded_stats_downloaded30 ON downloaded_stats (downloaded30 DESC);
CREATE INDEX downloaded_stats_downloaded7 ON downloaded_stats (downloaded7 DESC);
CREATE INDEX downloaded_stats_downloaded1 ON downloaded_stats (downloaded1 DESC);
CREATE OR REPLACE FUNCTION update_downloaded_stats(
"t_info_hash" BYTEA
) RETURNS void AS $$
DECLARE
"t_downloaded" BIGINT;
"t_downloaded30" BIGINT;
"t_downloaded7" BIGINT;
"t_downloaded1" BIGINT;
BEGIN
DELETE FROM downloaded_stats WHERE info_hash=t_info_hash;
SELECT COALESCE(SUM("value"), 0)
INTO "t_downloaded"
FROM counters
WHERE ("kind"='complete' OR "kind"='complete_w')
AND "info_hash"=t_info_hash;
SELECT COALESCE(SUM("value"), 0)
INTO "t_downloaded30"
FROM counters
WHERE ("kind"='complete' OR "kind"='complete_w')
AND "info_hash"=t_info_hash
AND "time" > (NOW() - '30 days'::INTERVAL);
SELECT COALESCE(SUM("value"), 0)
INTO "t_downloaded7"
FROM counters
WHERE ("kind"='complete' OR "kind"='complete_w')
AND "info_hash"=t_info_hash
AND "time" > (NOW() - '7 days'::INTERVAL);
SELECT COALESCE(SUM("value"), 0)
INTO "t_downloaded1"
FROM counters
WHERE ("kind"='complete' OR "kind"='complete_w')
AND "info_hash"=t_info_hash
AND "time" > (NOW() - '1 day'::INTERVAL);
INSERT INTO downloaded_stats (info_hash, downloaded, downloaded30, downloaded7, downloaded1)
VALUES (t_info_hash, t_downloaded, t_downloaded30, t_downloaded7, t_downloaded1);
END;
$$ LANGUAGE plpgsql;
-- Only for trigger when "kind"='complete'
CREATE OR REPLACE FUNCTION counters_update_downloaded_stats() RETURNS trigger AS $$
BEGIN
PERFORM update_downloaded_stats(NEW.info_hash);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- Push "downloaded" count to "scraped" cache
CREATE TRIGGER counters_update_downloaded_stats AFTER INSERT OR UPDATE ON counters
FOR EACH ROW
WHEN (NEW.kind = 'complete')
EXECUTE PROCEDURE counters_update_downloaded_stats();
-- Run periodically to update downloaded_stats.downloaded{30,7,1}
CREATE OR REPLACE FUNCTION update_all_downloaded_stats() RETURNS void AS $$
DECLARE
t_info_hash BYTEA;
BEGIN
FOR t_info_hash IN
SELECT DISTINCT "info_hash"
FROM counters
WHERE (kind = 'complete' OR "kind"='complete_w')
AND time >= (NOW() - '31 days'::INTERVAL)
AND time <= (NOW() - '1 day'::INTERVAL)
LOOP
PERFORM update_downloaded_stats(t_info_hash);
END LOOP;
END;
$$ LANGUAGE plpgsql;
CREATE index counters_completes ON counters (kind, time, info_hash) WHERE kind = 'complete' OR kind = 'complete_w';
-- Compaction code
CREATE OR REPLACE FUNCTION compact_counters(
"p_start" TIMESTAMP,
"p_end" TIMESTAMP
) RETURNS void AS $$
DECLARE
"t_info_hash" BYTEA;
"t_kind" TEXT;
"t_total" BIGINT;
"t_start" TIMESTAMP;
BEGIN
FOR t_kind, t_info_hash, t_total, t_start IN
SELECT kind, info_hash, SUM("value"), MIN("time")
FROM counters
WHERE "time" >= p_start
AND "time" < p_end
GROUP BY kind, info_hash
LOOP
DELETE FROM counters
WHERE "kind" = t_kind
AND "info_hash" = t_info_hash
AND "time" >= p_start
AND "time" < p_end;
INSERT INTO counters ("kind", "time", "info_hash", "value")
VALUES (t_kind, t_start, t_info_hash, t_total);
END LOOP;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION compact_gauges(
"p_start" TIMESTAMP,
"p_end" TIMESTAMP
) RETURNS void AS $$
DECLARE
"t_info_hash" BYTEA;
"t_kind" TEXT;
"t_total" BIGINT;
"t_start" TIMESTAMP;
BEGIN
FOR t_kind, t_info_hash, t_total, t_start IN
SELECT kind, info_hash, AVG("value"), MIN("time")
FROM gauges
WHERE "time" >= p_start
AND "time" < p_end
GROUP BY kind, info_hash
LOOP
DELETE FROM gauges
WHERE "kind" = t_kind
AND "info_hash" = t_info_hash
AND "time" >= p_start
AND "time" < p_end;
INSERT INTO counters ("kind", "time", "info_hash", "value")
VALUES (t_kind, t_start, t_info_hash, t_total);
END LOOP;
END;
$$ LANGUAGE plpgsql;