Skip to content

Commit

Permalink
IGNITE-24030: Schema commands DDL handler.
Browse files Browse the repository at this point in the history
  • Loading branch information
lowka committed Jan 11, 2025
1 parent 3739f46 commit d311a3a
Show file tree
Hide file tree
Showing 23 changed files with 752 additions and 51 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

/**
* This exception is thrown when a schema cannot be created because another schema with
* the same name already exists in a catalog.
*
* <p>This exception is used to properly handle IF NOT EXISTS flag in ddl command handler.
*/
public class SchemaExistsException extends CatalogValidationException {
private static final long serialVersionUID = 6017288060655861875L;

/**
* Constructor,
*
* @param message Error message.
*/
public SchemaExistsException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.ignite.internal.catalog;

/**
* This exception is thrown when a schema is not found in a catalog.
*
* <p>This exception is used to properly handle IF EXISTS flag in ddl command handler.
*/
public class SchemaNotFoundException extends CatalogValidationException {
private static final long serialVersionUID = 6017288060655861875L;

/**
* Constructor,
*
* @param message Error message.
*/
public SchemaNotFoundException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.SchemaExistsException;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
Expand All @@ -40,22 +40,33 @@ public class CreateSchemaCommand implements CatalogCommand {

private final String schemaName;

private CreateSchemaCommand(String schemaName) {
private final boolean ifNotExists;

private CreateSchemaCommand(String schemaName, boolean ifNotExists) {
validateIdentifier(schemaName, "Name of the schema");

this.schemaName = schemaName;
this.ifNotExists = ifNotExists;
}

public boolean ifNotExists() {
return ifNotExists;
}

/** {@inheritDoc} */
@Override
public List<UpdateEntry> get(Catalog catalog) {
int id = catalog.objectIdGenState();

if (catalog.schema(schemaName) != null) {
throw new CatalogValidationException(format("Schema with name '{}' already exists", schemaName));
CatalogSchemaDescriptor schema = catalog.schema(schemaName);

if (ifNotExists && schema != null) {
return List.of();
} else if (schema != null) {
throw new SchemaExistsException(format("Schema with name '{}' already exists.", schemaName));
}

CatalogSchemaDescriptor schema = new CatalogSchemaDescriptor(
CatalogSchemaDescriptor newSchema = new CatalogSchemaDescriptor(
id,
schemaName,
new CatalogTableDescriptor[0],
Expand All @@ -65,7 +76,7 @@ public List<UpdateEntry> get(Catalog catalog) {
);

return List.of(
new NewSchemaEntry(schema),
new NewSchemaEntry(newSchema),
new ObjectIdGenUpdateEntry(1)
);
}
Expand All @@ -80,17 +91,26 @@ public static class Builder implements CreateSchemaCommandBuilder {

private String name;

private boolean ifNotExists;

/** {@inheritDoc} */
@Override
public CreateSchemaCommandBuilder name(String name) {
this.name = name;
return this;
}

/** {@inheritDoc} */
@Override
public CreateSchemaCommandBuilder ifNotExists(boolean value) {
this.ifNotExists = value;
return this;
}

/** {@inheritDoc} */
@Override
public CatalogCommand build() {
return new CreateSchemaCommand(name);
return new CreateSchemaCommand(name, ifNotExists);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public interface CreateSchemaCommandBuilder {
/** Sets schema name. Should not be null or blank. */
CreateSchemaCommandBuilder name(String name);

/** Sets a flag indicating whether {@code IF NOT EXISTS} option was specified. */
CreateSchemaCommandBuilder ifNotExists(boolean value);

/** Creates new schema command. */
CatalogCommand build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@

import static org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.validateIdentifier;
import static org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogCommand;
import org.apache.ignite.internal.catalog.CatalogValidationException;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.SchemaNotFoundException;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.storage.DropIndexEntry;
import org.apache.ignite.internal.catalog.storage.DropSchemaEntry;
import org.apache.ignite.internal.catalog.storage.DropTableEntry;
import org.apache.ignite.internal.catalog.storage.RemoveIndexEntry;
import org.apache.ignite.internal.catalog.storage.UpdateEntry;

/**
Expand All @@ -46,18 +48,26 @@ public static DropSchemaCommandBuilder builder() {

private final boolean cascade;

private final boolean ifExists;

/**
* Constructor.
*
* @param schemaName Name of the schema.
* @param cascade Flag indicating forced deletion of a non-empty schema.
* @param ifExists Flag indicating
* @throws CatalogValidationException if any of restrictions above is violated.
*/
private DropSchemaCommand(String schemaName, boolean cascade) throws CatalogValidationException {
private DropSchemaCommand(String schemaName, boolean cascade, boolean ifExists) throws CatalogValidationException {
validateIdentifier(schemaName, "Name of the schema");

this.schemaName = schemaName;
this.cascade = cascade;
this.ifExists = ifExists;
}

public boolean ifExists() {
return ifExists;
}

@Override
Expand All @@ -66,17 +76,30 @@ public List<UpdateEntry> get(Catalog catalog) {
throw new CatalogValidationException("System schema can't be dropped [name={}].", schemaName);
}

CatalogSchemaDescriptor schema = schemaOrThrow(catalog, schemaName);
CatalogSchemaDescriptor schema;

if (ifExists) {
schema = catalog.schema(schemaName);
if (schema == null) {
return List.of();
}
} else {
schema = schemaOrThrow(catalog, schemaName);
}

if (!cascade && !schema.isEmpty()) {
throw new CatalogValidationException("Schema '{}' is not empty. Use CASCADE to drop it anyway.", schemaName);
throw new SchemaNotFoundException(format("Schema '{}' is not empty. Use CASCADE to drop it anyway.", schemaName));
}

List<UpdateEntry> updateEntries = new ArrayList<>();

for (CatalogIndexDescriptor idx : schema.indexes()) {
updateEntries.add(new DropIndexEntry(idx.id()));
}
Arrays.stream(schema.indexes())
.forEach(index -> {
// We can remove AVAILABLE/STOPPED index right away as the only reason to have an index in the STOPPING state is to
// allow RW transactions started before the index drop to write to it, but as the table is already dropped,
// the writes are not possible in any case.
updateEntries.add(new RemoveIndexEntry(index.id()));
});

for (CatalogTableDescriptor tbl : schema.tables()) {
updateEntries.add(new DropTableEntry(tbl.id()));
Expand All @@ -93,6 +116,7 @@ public List<UpdateEntry> get(Catalog catalog) {
private static class Builder implements DropSchemaCommandBuilder {
private String schemaName;
private boolean cascade;
private boolean ifExists;

@Override
public DropSchemaCommandBuilder name(String schemaName) {
Expand All @@ -108,9 +132,16 @@ public DropSchemaCommandBuilder cascade(boolean cascade) {
return this;
}

@Override
public DropSchemaCommandBuilder ifExists(boolean ifExists) {
this.ifExists = ifExists;

return this;
}

@Override
public CatalogCommand build() {
return new DropSchemaCommand(schemaName, cascade);
return new DropSchemaCommand(schemaName, cascade, ifExists);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ public interface DropSchemaCommandBuilder {
/** Sets flag indicating forced deletion of a non-empty schema. */
DropSchemaCommandBuilder cascade(boolean cascade);

/** Sets a flag indicating whether {@code IF EXISTS} option was specified. */
DropSchemaCommandBuilder ifExists(boolean ifExists);

/** Returns a command with specified parameters. */
CatalogCommand build();
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
import static org.apache.ignite.sql.ColumnType.INT32;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.hasSize;
Expand Down Expand Up @@ -53,6 +54,48 @@ public void testCreateSchema() {
manager.execute(CreateSchemaCommand.builder().name(TEST_SCHEMA).build()),
willThrowFast(CatalogValidationException.class, "Schema with name 'S1' already exists")
);

assertThat(
manager.execute(CreateSchemaCommand.builder().name(TEST_SCHEMA).ifNotExists(true).build()),
willSucceedFast()
);
}

@Test
public void testCreateSchemaIfNotExists() {
{
assertThat(
manager.execute(CreateSchemaCommand.builder().name(TEST_SCHEMA).ifNotExists(false).build()),
willCompleteSuccessfully()
);

Catalog latestCatalog = latestCatalog();

assertNotNull(latestCatalog.schema(TEST_SCHEMA));
assertNotNull(latestCatalog.schema(SqlCommon.DEFAULT_SCHEMA_NAME));

assertThat(
manager.execute(CreateSchemaCommand.builder().name(TEST_SCHEMA).build()),
willThrowFast(CatalogValidationException.class, "Schema with name 'S1' already exists")
);
}

{
assertThat(
manager.execute(CreateSchemaCommand.builder().name(TEST_SCHEMA + "_1").ifNotExists(false).build()),
willCompleteSuccessfully()
);

Catalog latestCatalog = latestCatalog();

assertNotNull(latestCatalog.schema(TEST_SCHEMA + "_1"));
assertNotNull(latestCatalog.schema(TEST_SCHEMA));

assertThat(
manager.execute(CreateSchemaCommand.builder().name(TEST_SCHEMA + "_1").ifNotExists(true).build()),
willSucceedFast()
);
}
}

@Test
Expand All @@ -75,6 +118,21 @@ public void testDropEmpty() {
);
}

@Test
public void testDropIfExists() {
assertThat(manager.execute(DropSchemaCommand.builder().name(TEST_SCHEMA).ifExists(true).build()), willCompleteSuccessfully());

assertThat(
manager.execute(DropSchemaCommand.builder().name(TEST_SCHEMA).ifExists(false).build()),
willThrowFast(CatalogValidationException.class, "Schema with name 'S1' not found")
);

assertThat(manager.execute(CreateSchemaCommand.builder().name(TEST_SCHEMA).build()), willCompleteSuccessfully());

assertThat(manager.execute(DropSchemaCommand.builder().name(TEST_SCHEMA).ifExists(true).build()), willCompleteSuccessfully());
assertThat(latestCatalog().schema(TEST_SCHEMA), nullValue());
}

@Test
public void testDropDefaultSchemaIsAllowed() {
CatalogCommand cmd = DropSchemaCommand.builder().name(SqlCommon.DEFAULT_SCHEMA_NAME).build();
Expand Down
Loading

0 comments on commit d311a3a

Please sign in to comment.