Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move INSERT query of repack.repack_trigger to C #368

Merged
merged 2 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion META.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"name": "pg_repack",
"abstract": "PostgreSQL module for data reorganization",
"description": "Reorganize tables in PostgreSQL databases with minimal locks",
"version": "1.4.8",
"version": "1.4.9",
"maintainer": [
"Beena Emerson <[email protected]>",
"Josh Kupershmidt <[email protected]>",
Expand Down
26 changes: 8 additions & 18 deletions bin/pg_repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ typedef struct repack_table
const char *create_trigger; /* CREATE TRIGGER repack_trigger */
const char *enable_trigger; /* ALTER TABLE ENABLE ALWAYS TRIGGER repack_trigger */
const char *create_table; /* CREATE TABLE table AS SELECT WITH NO DATA*/
const char *dest_tablespace; /* Destination tablespace */
const char *copy_data; /* INSERT INTO */
const char *alter_col_storage; /* ALTER TABLE ALTER COLUMN SET STORAGE */
const char *drop_columns; /* ALTER TABLE DROP COLUMNs */
Expand Down Expand Up @@ -893,9 +894,6 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
{
repack_table table;
StringInfoData copy_sql;
const char *create_table_1;
const char *create_table_2;
const char *tablespace;
const char *ckey;
int c = 0;

Expand All @@ -919,9 +917,8 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
table.create_trigger = getstr(res, i, c++);
table.enable_trigger = getstr(res, i, c++);

create_table_1 = getstr(res, i, c++);
tablespace = getstr(res, i, c++); /* to be clobbered */
create_table_2 = getstr(res, i, c++);
table.create_table = getstr(res, i, c++);
getstr(res, i, c++); /* tablespace_orig is clobbered */
table.copy_data = getstr(res, i , c++);
table.alter_col_storage = getstr(res, i, c++);
table.drop_columns = getstr(res, i, c++);
Expand All @@ -933,17 +930,7 @@ repack_one_database(const char *orderby, char *errbuf, size_t errsize)
table.sql_delete = getstr(res, i, c++);
table.sql_update = getstr(res, i, c++);
table.sql_pop = getstr(res, i, c++);
tablespace = getstr(res, i, c++);

/* Craft CREATE TABLE SQL */
resetStringInfo(&sql);
appendStringInfoString(&sql, create_table_1);
appendStringInfoString(&sql, tablespace);
appendStringInfoString(&sql, create_table_2);

/* Always append WITH NO DATA to CREATE TABLE SQL*/
appendStringInfoString(&sql, " WITH NO DATA");
table.create_table = sql.data;
table.dest_tablespace = getstr(res, i, c++);

/* Craft Copy SQL */
initStringInfo(&copy_sql);
Expand Down Expand Up @@ -1268,6 +1255,7 @@ repack_one_table(repack_table *table, const char *orderby)
elog(DEBUG2, "create_trigger : %s", table->create_trigger);
elog(DEBUG2, "enable_trigger : %s", table->enable_trigger);
elog(DEBUG2, "create_table : %s", table->create_table);
elog(DEBUG2, "dest_tablespace : %s", table->dest_tablespace);
elog(DEBUG2, "copy_data : %s", table->copy_data);
elog(DEBUG2, "alter_col_storage : %s", table->alter_col_storage ?
table->alter_col_storage : "(skipped)");
Expand Down Expand Up @@ -1530,7 +1518,9 @@ repack_one_table(repack_table *table, const char *orderby)
* Before copying data to the target table, we need to set the column storage
* type if its storage type has been changed from the type default.
*/
command(table->create_table, 0, NULL);
params[0] = utoa(table->target_oid, buffer);
params[1] = table->dest_tablespace;
command(table->create_table, 2, params);
if (table->alter_col_storage)
command(table->alter_col_storage, 0, NULL);
command(table->copy_data, 0, NULL);
Expand Down
54 changes: 43 additions & 11 deletions lib/pg_repack.sql.in
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ LANGUAGE sql STABLE STRICT SET search_path to 'pg_catalog';

CREATE FUNCTION repack.get_index_columns(oid, text) RETURNS text AS
$$
SELECT coalesce(string_agg(quote_ident(attname), $2), '')
SELECT coalesce(string_agg(quote_literal(attname), $2), '')
za-arthur marked this conversation as resolved.
Show resolved Hide resolved
FROM pg_attribute,
(SELECT indrelid,
indkey,
Expand All @@ -43,6 +43,37 @@ CREATE FUNCTION repack.get_order_by(oid, oid) RETURNS text AS
'MODULE_PATHNAME', 'repack_get_order_by'
LANGUAGE C STABLE STRICT;

CREATE FUNCTION repack.create_log_table(oid) RETURNS void AS
$$
BEGIN
EXECUTE 'CREATE TABLE repack.log_' || $1 ||
' (id bigserial PRIMARY KEY,' ||
' pk repack.pk_' || $1 || ',' ||
' row ' || repack.oid2text($1) || ')';
END
$$
LANGUAGE plpgsql;

CREATE FUNCTION repack.create_table(oid, name) RETURNS void AS
$$
BEGIN
EXECUTE 'CREATE TABLE repack.table_' || $1 ||
za-arthur marked this conversation as resolved.
Show resolved Hide resolved
' WITH (' || repack.get_storage_param($1) || ') ' ||
' TABLESPACE ' || quote_ident($2) ||
' AS SELECT ' || repack.get_columns_for_create_as($1) ||
' FROM ONLY ' || repack.oid2text($1) || ' WITH NO DATA';
END
$$
LANGUAGE plpgsql;

CREATE FUNCTION repack.create_index_type(oid, oid) RETURNS void AS
$$
BEGIN
EXECUTE repack.get_create_index_type($1, 'repack.pk_' || $2);
END
$$
LANGUAGE plpgsql;

CREATE FUNCTION repack.get_create_index_type(oid, name) RETURNS text AS
$$
SELECT 'CREATE TYPE ' || $2 || ' AS (' ||
Expand All @@ -66,10 +97,7 @@ $$
SELECT 'CREATE TRIGGER repack_trigger' ||
' AFTER INSERT OR DELETE OR UPDATE ON ' || repack.oid2text($1) ||
' FOR EACH ROW EXECUTE PROCEDURE repack.repack_trigger(' ||
'''INSERT INTO repack.log_' || $1 || '(pk, row) VALUES(' ||
' CASE WHEN $1 IS NULL THEN NULL ELSE (ROW($1.' ||
repack.get_index_columns($2, ', $1.') || ')::repack.pk_' ||
$1 || ') END, $2)'')';
repack.get_index_columns($2, ', ') || ')';
$$
LANGUAGE sql STABLE STRICT;

Expand Down Expand Up @@ -240,13 +268,12 @@ CREATE VIEW repack.tables AS
N.nspname AS schemaname,
PK.indexrelid AS pkid,
CK.indexrelid AS ckid,
repack.get_create_index_type(PK.indexrelid, 'repack.pk_' || R.oid) AS create_pktype,
'CREATE TABLE repack.log_' || R.oid || ' (id bigserial PRIMARY KEY, pk repack.pk_' || R.oid || ', row ' || repack.oid2text(R.oid) || ')' AS create_log,
'SELECT repack.create_index_type(' || PK.indexrelid || ',' || R.oid || ')' AS create_pktype,
'SELECT repack.create_log_table(' || R.oid || ')' AS create_log,
repack.get_create_trigger(R.oid, PK.indexrelid) AS create_trigger,
repack.get_enable_trigger(R.oid) as enable_trigger,
'CREATE TABLE repack.table_' || R.oid || ' WITH (' || repack.get_storage_param(R.oid) || ') TABLESPACE ' AS create_table_1,
coalesce(quote_ident(S.spcname), 'pg_default') as tablespace_orig,
' AS SELECT ' || repack.get_columns_for_create_as(R.oid) || ' FROM ONLY ' || repack.oid2text(R.oid) AS create_table_2,
'SELECT repack.create_table($1, $2)' AS create_table,
coalesce(S.spcname, S2.spcname) AS tablespace_orig,
'INSERT INTO repack.table_' || R.oid || ' SELECT ' || repack.get_columns_for_create_as(R.oid) || ' FROM ONLY ' || repack.oid2text(R.oid) AS copy_data,
repack.get_alter_col_storage(R.oid) AS alter_col_storage,
repack.get_drop_columns(R.oid, 'repack.table_' || R.oid) AS drop_columns,
Expand All @@ -270,6 +297,10 @@ CREATE VIEW repack.tables AS
ON R.oid = CK.indrelid
LEFT JOIN pg_namespace N ON N.oid = R.relnamespace
LEFT JOIN pg_tablespace S ON S.oid = R.reltablespace
CROSS JOIN (SELECT S2.spcname
FROM pg_catalog.pg_database D
JOIN pg_catalog.pg_tablespace S2 ON S2.oid = D.dattablespace
WHERE D.datname = current_database()) S2
WHERE R.relkind = 'r'
AND R.relpersistence = 'p'
AND N.nspname NOT IN ('pg_catalog', 'information_schema')
Expand All @@ -281,7 +312,8 @@ LANGUAGE C STABLE;

CREATE FUNCTION repack.repack_trigger() RETURNS trigger AS
'MODULE_PATHNAME', 'repack_trigger'
LANGUAGE C VOLATILE STRICT SECURITY DEFINER;
LANGUAGE C VOLATILE STRICT SECURITY DEFINER
SET search_path = pg_catalog, pg_temp;

CREATE FUNCTION repack.conflicted_triggers(oid) RETURNS SETOF name AS
$$
Expand Down
29 changes: 21 additions & 8 deletions lib/repack.c
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,11 @@ repack_version(PG_FUNCTION_ARGS)
* @fn Datum repack_trigger(PG_FUNCTION_ARGS)
* @brief Insert a operation log into log-table.
*
* repack_trigger(sql)
* repack_trigger(column1, ..., columnN)
*
* @param sql SQL to insert a operation log into log-table.
* @param column1 A column of the table in primary key/unique index.
* ...
* @param columnN A column of the table in primary key/unique index.
*/
Datum
repack_trigger(PG_FUNCTION_ARGS)
Expand All @@ -158,7 +160,8 @@ repack_trigger(PG_FUNCTION_ARGS)
Datum values[2];
bool nulls[2] = { 0, 0 };
Oid argtypes[2];
const char *sql;
Oid relid;
StringInfo sql;

/* authority check */
must_be_superuser("repack_trigger");
Expand All @@ -167,11 +170,12 @@ repack_trigger(PG_FUNCTION_ARGS)
if (!CALLED_AS_TRIGGER(fcinfo) ||
!TRIGGER_FIRED_AFTER(trigdata->tg_event) ||
!TRIGGER_FIRED_FOR_ROW(trigdata->tg_event) ||
trigdata->tg_trigger->tgnargs != 1)
trigdata->tg_trigger->tgnargs < 1)
elog(ERROR, "repack_trigger: invalid trigger call");

relid = RelationGetRelid(trigdata->tg_relation);

/* retrieve parameters */
sql = trigdata->tg_trigger->tgargs[0];
desc = RelationGetDescr(trigdata->tg_relation);
argtypes[0] = argtypes[1] = trigdata->tg_relation->rd_rel->reltype;

Expand Down Expand Up @@ -200,8 +204,17 @@ repack_trigger(PG_FUNCTION_ARGS)
values[1] = copy_tuple(tuple, desc);
}

/* INSERT INTO repack.log VALUES ($1, $2) */
execute_with_args(SPI_OK_INSERT, sql, 2, argtypes, values, nulls);
/* prepare INSERT query */
sql = makeStringInfo();
appendStringInfo(sql, "INSERT INTO repack.log_%d(pk, row) "
"VALUES(CASE WHEN $1 IS NULL THEN NULL ELSE (ROW(", relid);
appendStringInfo(sql, "$1.%s", quote_identifier(trigdata->tg_trigger->tgargs[0]));
for (int i = 1; i < trigdata->tg_trigger->tgnargs; ++i)
appendStringInfo(sql, ", $1.%s", quote_identifier(trigdata->tg_trigger->tgargs[i]));
appendStringInfo(sql, ")::repack.pk_%d) END, $2)", relid);

/* execute the INSERT query */
execute_with_args(SPI_OK_INSERT, sql->data, 2, argtypes, values, nulls);

SPI_finish();

Expand Down Expand Up @@ -794,7 +807,7 @@ repack_indexdef(PG_FUNCTION_ARGS)
/* specify the new tablespace or the original one if any */
if (tablespace || stmt.tablespace)
appendStringInfo(&str, " TABLESPACE %s",
(tablespace ? NameStr(*tablespace) : stmt.tablespace));
(tablespace ? quote_identifier(NameStr(*tablespace)) : stmt.tablespace));
za-arthur marked this conversation as resolved.
Show resolved Hide resolved

if (stmt.where)
appendStringInfo(&str, " WHERE %s", stmt.where);
Expand Down
2 changes: 1 addition & 1 deletion regress/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ INTVERSION := $(shell echo $$(($$(echo $(VERSION).0 | sed 's/\([[:digit:]]\{1,\}
# Test suite
#

REGRESS := init-extension repack-setup repack-run error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by
REGRESS := init-extension repack-setup repack-run error-on-invalid-idx after-schema repack-check nosuper tablespace get_order_by trigger

USE_PGXS = 1 # use pgxs if not in contrib directory
PGXS := $(shell $(PG_CONFIG) --pgxs)
Expand Down
28 changes: 28 additions & 0 deletions regress/expected/trigger.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
--
-- repack.repack_trigger tests
--
CREATE TABLE trigger_t1 (a int, b int, primary key (a, b));
CREATE INDEX trigger_t1_idx ON trigger_t1 (a, b);
SELECT create_trigger FROM repack.tables WHERE relname = 'public.trigger_t1';
create_trigger
----------------------------------------------------------------------------------------------------------------------------------------------------
CREATE TRIGGER repack_trigger AFTER INSERT OR DELETE OR UPDATE ON public.trigger_t1 FOR EACH ROW EXECUTE PROCEDURE repack.repack_trigger('a', 'b')
(1 row)

SELECT oid AS t1_oid FROM pg_catalog.pg_class WHERE relname = 'trigger_t1'
\gset
CREATE TYPE repack.pk_:t1_oid AS (a integer, b integer);
CREATE TABLE repack.log_:t1_oid (id bigserial PRIMARY KEY, pk repack.pk_:t1_oid, row public.trigger_t1);
CREATE TRIGGER repack_trigger AFTER INSERT OR DELETE OR UPDATE ON trigger_t1
FOR EACH ROW EXECUTE PROCEDURE repack.repack_trigger('a', 'b');
INSERT INTO trigger_t1 VALUES (111, 222);
UPDATE trigger_t1 SET a=333, b=444 WHERE a = 111;
DELETE FROM trigger_t1 WHERE a = 333;
SELECT * FROM repack.log_:t1_oid;
id | pk | row
----+-----------+-----------
1 | | (111,222)
2 | (111,222) | (333,444)
3 | (333,444) |
(3 rows)

21 changes: 21 additions & 0 deletions regress/sql/trigger.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
--
-- repack.repack_trigger tests
--

CREATE TABLE trigger_t1 (a int, b int, primary key (a, b));
CREATE INDEX trigger_t1_idx ON trigger_t1 (a, b);

SELECT create_trigger FROM repack.tables WHERE relname = 'public.trigger_t1';

SELECT oid AS t1_oid FROM pg_catalog.pg_class WHERE relname = 'trigger_t1'
\gset
za-arthur marked this conversation as resolved.
Show resolved Hide resolved

CREATE TYPE repack.pk_:t1_oid AS (a integer, b integer);
CREATE TABLE repack.log_:t1_oid (id bigserial PRIMARY KEY, pk repack.pk_:t1_oid, row public.trigger_t1);
CREATE TRIGGER repack_trigger AFTER INSERT OR DELETE OR UPDATE ON trigger_t1
FOR EACH ROW EXECUTE PROCEDURE repack.repack_trigger('a', 'b');

INSERT INTO trigger_t1 VALUES (111, 222);
UPDATE trigger_t1 SET a=333, b=444 WHERE a = 111;
DELETE FROM trigger_t1 WHERE a = 333;
SELECT * FROM repack.log_:t1_oid;