Skip to content

Commit b7d889e

Browse files
Braalfakukushking
andauthored
fix: do not remove new columns values (#3181)
* fix: do not remove new columns values * Expand athena iceberg add columns test to check values * Add missing blank line --------- Co-authored-by: Anton Kukushkin <[email protected]>
1 parent 286535f commit b7d889e

File tree

2 files changed

+13
-8
lines changed

2 files changed

+13
-8
lines changed

awswrangler/athena/_write_iceberg.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,7 @@ def to_iceberg( # noqa: PLR0913
548548

549549
# Ensure that the ordering of the DF is the same as in the catalog.
550550
# This is required for the INSERT command to work.
551-
df = df[catalog_cols]
551+
df = df[catalog_cols + [col_name for col_name, _ in schema_differences["new_columns"].items()]]
552552

553553
if schema_evolution is False and any([schema_differences[x] for x in schema_differences]): # type: ignore[literal-required]
554554
raise exceptions.InvalidArgumentValue(f"Schema change detected: {schema_differences}")

tests/unit/test_athena_iceberg.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ def test_athena_to_iceberg_overwrite_partitions_merge_cols_error(
325325
def test_athena_to_iceberg_schema_evolution_add_columns(
326326
path: str, path2: str, glue_database: str, glue_table: str
327327
) -> None:
328-
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [3, 4, 5]})
328+
df = pd.DataFrame({"c0": [0, 1, 2], "c1": [6, 7, 8]})
329329
wr.athena.to_iceberg(
330330
df=df,
331331
database=glue_database,
@@ -336,9 +336,9 @@ def test_athena_to_iceberg_schema_evolution_add_columns(
336336
schema_evolution=True,
337337
)
338338

339-
df["c2"] = [6, 7, 8]
339+
df2 = pd.DataFrame({"c0": [3, 4, 5], "c2": [9, 10, 11]})
340340
wr.athena.to_iceberg(
341-
df=df,
341+
df=df2,
342342
database=glue_database,
343343
table=glue_table,
344344
table_location=path,
@@ -348,20 +348,25 @@ def test_athena_to_iceberg_schema_evolution_add_columns(
348348
)
349349

350350
column_types = wr.catalog.get_table_types(glue_database, glue_table)
351-
assert len(column_types) == len(df.columns)
351+
assert len(column_types) == 3
352352

353353
df_out = wr.athena.read_sql_table(
354354
table=glue_table,
355355
database=glue_database,
356356
ctas_approach=False,
357357
unload_approach=False,
358358
)
359-
assert len(df_out) == len(df) * 2
360359

361-
df["c3"] = [9, 10, 11]
360+
df_expected = pd.DataFrame(
361+
{"c0": [0, 1, 2, 3, 4, 5], "c1": [6, 7, 8, np.nan, np.nan, np.nan], "c2": [np.nan, np.nan, np.nan, 9, 10, 11]},
362+
dtype="Int64",
363+
)
364+
assert_pandas_equals(df_out.sort_values(by=["c0"]).reset_index(drop=True), df_expected)
365+
366+
df3 = pd.DataFrame({"c0": [12], "c1": [13], "c2": [14], "c3": [15]})
362367
with pytest.raises(wr.exceptions.InvalidArgumentValue):
363368
wr.athena.to_iceberg(
364-
df=df,
369+
df=df3,
365370
database=glue_database,
366371
table=glue_table,
367372
table_location=path,

0 commit comments

Comments
 (0)