diff --git a/pandas/tests/io/test_sql.py b/pandas/tests/io/test_sql.py index cb8ee4891a41e..2f988d825d9db 100644 --- a/pandas/tests/io/test_sql.py +++ b/pandas/tests/io/test_sql.py @@ -58,6 +58,7 @@ SQLiteDatabase, _gt14, get_engine, + pandasSQL_builder, read_sql_query, read_sql_table, ) @@ -248,6 +249,13 @@ def create_and_load_types(conn, types_data: list[dict], dialect: str): conn.execute(stmt) +def check_iris_frame(frame: DataFrame): + pytype = frame.dtypes[0].type + row = frame.iloc[0] + assert issubclass(pytype, np.floating) + tm.equalContents(row.values, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"]) + + def count_rows(conn, table_name: str): stmt = f"SELECT count(*) AS count_1 FROM {table_name}" if isinstance(conn, sqlite3.Connection): @@ -367,6 +375,271 @@ def test_frame3(): return DataFrame(data, columns=columns) +@pytest.fixture +def mysql_pymysql_engine(iris_path, types_data): + sqlalchemy = pytest.importorskip("sqlalchemy") + pymysql = pytest.importorskip("pymysql") + engine = sqlalchemy.create_engine( + "mysql+pymysql://root@localhost:3306/pandas", + connect_args={"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS}, + ) + check_target = sqlalchemy.inspect(engine) if _gt14() else engine + if not check_target.has_table("iris"): + create_and_load_iris(engine, iris_path, "mysql") + if not check_target.has_table("types"): + for entry in types_data: + entry.pop("DateColWithTz") + create_and_load_types(engine, types_data, "mysql") + yield engine + with engine.connect() as conn: + with conn.begin(): + stmt = sqlalchemy.text("DROP TABLE IF EXISTS test_frame;") + conn.execute(stmt) + engine.dispose() + + +@pytest.fixture +def mysql_pymysql_conn(mysql_pymysql_engine): + yield mysql_pymysql_engine.connect() + + +@pytest.fixture +def postgresql_psycopg2_engine(iris_path, types_data): + sqlalchemy = pytest.importorskip("sqlalchemy") + pytest.importorskip("psycopg2") + engine = sqlalchemy.create_engine( + "postgresql+psycopg2://postgres:postgres@localhost:5432/pandas" + ) + check_target = sqlalchemy.inspect(engine) if _gt14() else engine + if not check_target.has_table("iris"): + create_and_load_iris(engine, iris_path, "postgresql") + if not check_target.has_table("types"): + create_and_load_types(engine, types_data, "postgresql") + yield engine + with engine.connect() as conn: + with conn.begin(): + stmt = sqlalchemy.text("DROP TABLE IF EXISTS test_frame;") + conn.execute(stmt) + engine.dispose() + + +@pytest.fixture +def postgresql_psycopg2_conn(postgresql_psycopg2_engine): + yield postgresql_psycopg2_engine.connect() + + +@pytest.fixture +def sqlite_engine(): + sqlalchemy = pytest.importorskip("sqlalchemy") + engine = sqlalchemy.create_engine("sqlite://") + yield engine + engine.dispose() + + +@pytest.fixture +def sqlite_conn(sqlite_engine): + yield sqlite_engine.connect() + + +@pytest.fixture +def sqlite_iris_engine(sqlite_engine, iris_path): + create_and_load_iris(sqlite_engine, iris_path, "sqlite") + return sqlite_engine + + +@pytest.fixture +def sqlite_iris_conn(sqlite_iris_engine): + yield sqlite_iris_engine.connect() + + +@pytest.fixture +def sqlite_buildin(): + conn = sqlite3.connect(":memory:") + yield conn + conn.close() + + +@pytest.fixture +def sqlite_buildin_iris(sqlite_buildin, iris_path): + create_and_load_iris_sqlite3(sqlite_buildin, iris_path) + return sqlite_buildin + + +mysql_connectable = [ + "mysql_pymysql_engine", + "mysql_pymysql_conn", +] + + +postgresql_connectable = [ + "postgresql_psycopg2_engine", + "postgresql_psycopg2_conn", +] + +sqlite_connectable = [ + "sqlite_engine", + "sqlite_conn", +] + +sqlite_iris_connectable = [ + "sqlite_iris_engine", + "sqlite_iris_conn", +] + +sqlalchemy_connectable = mysql_connectable + postgresql_connectable + sqlite_connectable + +sqlalchemy_connectable_iris = ( + mysql_connectable + postgresql_connectable + sqlite_iris_connectable +) + +all_connectable = sqlalchemy_connectable + ["sqlite_buildin"] + +all_connectable_iris = sqlalchemy_connectable_iris + ["sqlite_buildin_iris"] + + +@pytest.mark.parametrize("conn", all_connectable) +@pytest.mark.parametrize("method", [None, "multi"]) +def test_to_sql(conn, method, test_frame1, request): + conn = request.getfixturevalue(conn) + pandasSQL = pandasSQL_builder(conn) + pandasSQL.to_sql(test_frame1, "test_frame", method=method) + assert pandasSQL.has_table("test_frame") + assert count_rows(conn, "test_frame") == len(test_frame1) + + +@pytest.mark.parametrize("conn", all_connectable) +@pytest.mark.parametrize("mode, num_row_coef", [("replace", 1), ("append", 2)]) +def test_to_sql_exist(conn, mode, num_row_coef, test_frame1, request): + conn = request.getfixturevalue(conn) + pandasSQL = pandasSQL_builder(conn) + pandasSQL.to_sql(test_frame1, "test_frame", if_exists="fail") + pandasSQL.to_sql(test_frame1, "test_frame", if_exists=mode) + assert pandasSQL.has_table("test_frame") + assert count_rows(conn, "test_frame") == num_row_coef * len(test_frame1) + + +@pytest.mark.parametrize("conn", all_connectable) +def test_to_sql_exist_fail(conn, test_frame1, request): + conn = request.getfixturevalue(conn) + pandasSQL = pandasSQL_builder(conn) + pandasSQL.to_sql(test_frame1, "test_frame", if_exists="fail") + assert pandasSQL.has_table("test_frame") + + msg = "Table 'test_frame' already exists" + with pytest.raises(ValueError, match=msg): + pandasSQL.to_sql(test_frame1, "test_frame", if_exists="fail") + + +@pytest.mark.parametrize("conn", all_connectable_iris) +def test_read_iris(conn, request): + conn = request.getfixturevalue(conn) + pandasSQL = pandasSQL_builder(conn) + iris_frame = pandasSQL.read_query("SELECT * FROM iris") + check_iris_frame(iris_frame) + + +@pytest.mark.parametrize("conn", sqlalchemy_connectable) +def test_to_sql_callable(conn, test_frame1, request): + conn = request.getfixturevalue(conn) + pandasSQL = pandasSQL_builder(conn) + + check = [] # used to double check function below is really being used + + def sample(pd_table, conn, keys, data_iter): + check.append(1) + data = [dict(zip(keys, row)) for row in data_iter] + conn.execute(pd_table.table.insert(), data) + + pandasSQL.to_sql(test_frame1, "test_frame", method=sample) + assert pandasSQL.has_table("test_frame") + assert check == [1] + assert count_rows(conn, "test_frame") == len(test_frame1) + + +@pytest.mark.parametrize("conn", mysql_connectable) +def test_default_type_conversion(conn, request): + conn = request.getfixturevalue(conn) + df = sql.read_sql_table("types", conn) + + assert issubclass(df.FloatCol.dtype.type, np.floating) + assert issubclass(df.IntCol.dtype.type, np.integer) + + # MySQL has no real BOOL type (it's an alias for TINYINT) + assert issubclass(df.BoolCol.dtype.type, np.integer) + + # Int column with NA values stays as float + assert issubclass(df.IntColWithNull.dtype.type, np.floating) + + # Bool column with NA = int column with NA values => becomes float + assert issubclass(df.BoolColWithNull.dtype.type, np.floating) + + +@pytest.mark.parametrize("conn", mysql_connectable) +def test_read_procedure(conn, request): + conn = request.getfixturevalue(conn) + + # GH 7324 + # Although it is more an api test, it is added to the + # mysql tests as sqlite does not have stored procedures + from sqlalchemy import text + from sqlalchemy.engine import Engine + + df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]}) + df.to_sql("test_frame", conn, index=False) + + proc = """DROP PROCEDURE IF EXISTS get_testdb; + + CREATE PROCEDURE get_testdb () + + BEGIN + SELECT * FROM test_frame; + END""" + proc = text(proc) + if isinstance(conn, Engine): + with conn.connect() as engine_conn: + with engine_conn.begin(): + engine_conn.execute(proc) + else: + conn.execute(proc) + + res1 = sql.read_sql_query("CALL get_testdb();", conn) + tm.assert_frame_equal(df, res1) + + # test delegation to read_sql_query + res2 = sql.read_sql("CALL get_testdb();", conn) + tm.assert_frame_equal(df, res2) + + +@pytest.mark.parametrize("conn", postgresql_connectable) +def test_copy_from_callable_insertion_method(conn, request): + # GH 8953 + # Example in io.rst found under _io.sql.method + # not available in sqlite, mysql + def psql_insert_copy(table, conn, keys, data_iter): + # gets a DBAPI connection that can provide a cursor + dbapi_conn = conn.connection + with dbapi_conn.cursor() as cur: + s_buf = StringIO() + writer = csv.writer(s_buf) + writer.writerows(data_iter) + s_buf.seek(0) + + columns = ", ".join([f'"{k}"' for k in keys]) + if table.schema: + table_name = f"{table.schema}.{table.name}" + else: + table_name = table.name + + sql_query = f"COPY {table_name} ({columns}) FROM STDIN WITH CSV" + cur.copy_expert(sql=sql_query, file=s_buf) + + conn = request.getfixturevalue(conn) + expected = DataFrame({"col1": [1, 2], "col2": [0.1, 0.2], "col3": ["a", "n"]}) + expected.to_sql("test_frame", conn, index=False, method=psql_insert_copy) + result = sql.read_sql_table("test_frame", conn) + tm.assert_frame_equal(result, expected) + + class MixInBase: def teardown_method(self, method): # if setup fails, there may not be a connection to close. @@ -376,26 +649,6 @@ def teardown_method(self, method): self._close_conn() -class MySQLMixIn(MixInBase): - def drop_table(self, table_name): - cur = self.conn.cursor() - cur.execute(f"DROP TABLE IF EXISTS {sql._get_valid_mysql_name(table_name)}") - self.conn.commit() - - def _get_all_tables(self): - cur = self.conn.cursor() - cur.execute("SHOW TABLES") - return [table[0] for table in cur.fetchall()] - - def _close_conn(self): - from pymysql.err import Error - - try: - self.conn.close() - except Error: - pass - - class SQLiteMixIn(MixInBase): def drop_table(self, table_name): self.conn.execute( @@ -454,113 +707,27 @@ def load_types_data(self, types_data): else: create_and_load_types(self.conn, types_data, self.flavor) - def _check_iris_loaded_frame(self, iris_frame): - pytype = iris_frame.dtypes[0].type - row = iris_frame.iloc[0] - - assert issubclass(pytype, np.floating) - tm.equalContents(row.values, [5.1, 3.5, 1.4, 0.2, "Iris-setosa"]) - - def _read_sql_iris(self): - iris_frame = self.pandasSQL.read_query("SELECT * FROM iris") - self._check_iris_loaded_frame(iris_frame) - def _read_sql_iris_parameter(self): query = SQL_STRINGS["read_parameters"][self.flavor] params = ["Iris-setosa", 5.1] iris_frame = self.pandasSQL.read_query(query, params=params) - self._check_iris_loaded_frame(iris_frame) + check_iris_frame(iris_frame) def _read_sql_iris_named_parameter(self): query = SQL_STRINGS["read_named_parameters"][self.flavor] params = {"name": "Iris-setosa", "length": 5.1} iris_frame = self.pandasSQL.read_query(query, params=params) - self._check_iris_loaded_frame(iris_frame) + check_iris_frame(iris_frame) def _read_sql_iris_no_parameter_with_percent(self): query = SQL_STRINGS["read_no_parameters_with_percent"][self.flavor] iris_frame = self.pandasSQL.read_query(query, params=None) - self._check_iris_loaded_frame(iris_frame) - - def _to_sql(self, test_frame1, method=None): - self.drop_table("test_frame1") - - self.pandasSQL.to_sql(test_frame1, "test_frame1", method=method) - assert self.pandasSQL.has_table("test_frame1") - - num_entries = len(test_frame1) - num_rows = count_rows(self.conn, "test_frame1") - assert num_rows == num_entries - - # Nuke table - self.drop_table("test_frame1") + check_iris_frame(iris_frame) def _to_sql_empty(self, test_frame1): self.drop_table("test_frame1") self.pandasSQL.to_sql(test_frame1.iloc[:0], "test_frame1") - def _to_sql_fail(self, test_frame1): - self.drop_table("test_frame1") - - self.pandasSQL.to_sql(test_frame1, "test_frame1", if_exists="fail") - assert self.pandasSQL.has_table("test_frame1") - - msg = "Table 'test_frame1' already exists" - with pytest.raises(ValueError, match=msg): - self.pandasSQL.to_sql(test_frame1, "test_frame1", if_exists="fail") - - self.drop_table("test_frame1") - - def _to_sql_replace(self, test_frame1): - self.drop_table("test_frame1") - - self.pandasSQL.to_sql(test_frame1, "test_frame1", if_exists="fail") - # Add to table again - self.pandasSQL.to_sql(test_frame1, "test_frame1", if_exists="replace") - assert self.pandasSQL.has_table("test_frame1") - - num_entries = len(test_frame1) - num_rows = count_rows(self.conn, "test_frame1") - - assert num_rows == num_entries - self.drop_table("test_frame1") - - def _to_sql_append(self, test_frame1): - # Nuke table just in case - self.drop_table("test_frame1") - - self.pandasSQL.to_sql(test_frame1, "test_frame1", if_exists="fail") - - # Add to table again - self.pandasSQL.to_sql(test_frame1, "test_frame1", if_exists="append") - assert self.pandasSQL.has_table("test_frame1") - - num_entries = 2 * len(test_frame1) - num_rows = count_rows(self.conn, "test_frame1") - - assert num_rows == num_entries - self.drop_table("test_frame1") - - def _to_sql_method_callable(self, test_frame1): - check = [] # used to double check function below is really being used - - def sample(pd_table, conn, keys, data_iter): - check.append(1) - data = [dict(zip(keys, row)) for row in data_iter] - conn.execute(pd_table.table.insert(), data) - - self.drop_table("test_frame1") - - self.pandasSQL.to_sql(test_frame1, "test_frame1", method=sample) - assert self.pandasSQL.has_table("test_frame1") - - assert check == [1] - num_entries = len(test_frame1) - num_rows = count_rows(self.conn, "test_frame1") - assert num_rows == num_entries - # Nuke table - self.drop_table("test_frame1") - def _to_sql_with_sql_engine(self, test_frame1, engine="auto", **engine_kwargs): """`to_sql` with the `engine` param""" # mostly copied from this class's `_to_sql()` method @@ -675,13 +842,9 @@ def setup_method(self, load_iris_data, load_types_data): def load_test_data_and_sql(self): create_and_load_iris_view(self.conn) - def test_read_sql_iris(self): - iris_frame = sql.read_sql_query("SELECT * FROM iris", self.conn) - self._check_iris_loaded_frame(iris_frame) - def test_read_sql_view(self): iris_frame = sql.read_sql_query("SELECT * FROM iris_view", self.conn) - self._check_iris_loaded_frame(iris_frame) + check_iris_frame(iris_frame) def test_read_sql_with_chunksize_no_result(self): query = "SELECT * FROM iris_view WHERE SepalLength < 0.0" @@ -1442,36 +1605,15 @@ def setup_connect(self): except sqlalchemy.exc.OperationalError: pytest.skip(f"Can't connect to {self.flavor} server") - def test_read_sql(self): - self._read_sql_iris() - def test_read_sql_parameter(self): self._read_sql_iris_parameter() def test_read_sql_named_parameter(self): self._read_sql_iris_named_parameter() - def test_to_sql(self, test_frame1): - self._to_sql(test_frame1) - def test_to_sql_empty(self, test_frame1): self._to_sql_empty(test_frame1) - def test_to_sql_fail(self, test_frame1): - self._to_sql_fail(test_frame1) - - def test_to_sql_replace(self, test_frame1): - self._to_sql_replace(test_frame1) - - def test_to_sql_append(self, test_frame1): - self._to_sql_append(test_frame1) - - def test_to_sql_method_multi(self, test_frame1): - self._to_sql(test_frame1, method="multi") - - def test_to_sql_method_callable(self, test_frame1): - self._to_sql_method_callable(test_frame1) - def test_create_table(self): temp_conn = self.connect() temp_frame = DataFrame( @@ -1522,7 +1664,7 @@ def test_execute_sql(self): def test_read_table(self): iris_frame = sql.read_sql_table("iris", con=self.conn) - self._check_iris_loaded_frame(iris_frame) + check_iris_frame(iris_frame) def test_read_table_columns(self): iris_frame = sql.read_sql_table( @@ -2244,51 +2386,7 @@ def setup_driver(cls): cls.connect_args = {"client_flag": pymysql.constants.CLIENT.MULTI_STATEMENTS} def test_default_type_conversion(self): - df = sql.read_sql_table("types", self.conn) - - assert issubclass(df.FloatCol.dtype.type, np.floating) - assert issubclass(df.IntCol.dtype.type, np.integer) - - # MySQL has no real BOOL type (it's an alias for TINYINT) - assert issubclass(df.BoolCol.dtype.type, np.integer) - - # Int column with NA values stays as float - assert issubclass(df.IntColWithNull.dtype.type, np.floating) - - # Bool column with NA = int column with NA values => becomes float - assert issubclass(df.BoolColWithNull.dtype.type, np.floating) - - def test_read_procedure(self): - from sqlalchemy import text - from sqlalchemy.engine import Engine - - # GH 7324 - # Although it is more an api test, it is added to the - # mysql tests as sqlite does not have stored procedures - df = DataFrame({"a": [1, 2, 3], "b": [0.1, 0.2, 0.3]}) - df.to_sql("test_procedure", self.conn, index=False) - - proc = """DROP PROCEDURE IF EXISTS get_testdb; - - CREATE PROCEDURE get_testdb () - - BEGIN - SELECT * FROM test_procedure; - END""" - proc = text(proc) - if isinstance(self.conn, Engine): - with self.conn.connect() as conn: - with conn.begin(): - conn.execute(proc) - else: - self.conn.execute(proc) - - res1 = sql.read_sql_query("CALL get_testdb();", self.conn) - tm.assert_frame_equal(df, res1) - - # test delegation to read_sql_query - res2 = sql.read_sql("CALL get_testdb();", self.conn) - tm.assert_frame_equal(df, res2) + pass class _TestPostgreSQLAlchemy: @@ -2383,35 +2481,6 @@ def test_schema_support(self): res2 = pdsql.read_table("test_schema_other2") tm.assert_frame_equal(res1, res2) - def test_copy_from_callable_insertion_method(self): - # GH 8953 - # Example in io.rst found under _io.sql.method - # not available in sqlite, mysql - def psql_insert_copy(table, conn, keys, data_iter): - # gets a DBAPI connection that can provide a cursor - dbapi_conn = conn.connection - with dbapi_conn.cursor() as cur: - s_buf = StringIO() - writer = csv.writer(s_buf) - writer.writerows(data_iter) - s_buf.seek(0) - - columns = ", ".join([f'"{k}"' for k in keys]) - if table.schema: - table_name = f"{table.schema}.{table.name}" - else: - table_name = table.name - - sql_query = f"COPY {table_name} ({columns}) FROM STDIN WITH CSV" - cur.copy_expert(sql=sql_query, file=s_buf) - - expected = DataFrame({"col1": [1, 2], "col2": [0.1, 0.2], "col3": ["a", "n"]}) - expected.to_sql( - "test_copy_insert", self.conn, index=False, method=psql_insert_copy - ) - result = sql.read_sql_table("test_copy_insert", self.conn) - tm.assert_frame_equal(result, expected) - @pytest.mark.single @pytest.mark.db @@ -2471,34 +2540,15 @@ def setup_connect(self): def setup_method(self, load_iris_data, load_types_data): self.pandasSQL = sql.SQLiteDatabase(self.conn) - def test_read_sql(self): - self._read_sql_iris() - def test_read_sql_parameter(self): self._read_sql_iris_parameter() def test_read_sql_named_parameter(self): self._read_sql_iris_named_parameter() - def test_to_sql(self, test_frame1): - self._to_sql(test_frame1) - def test_to_sql_empty(self, test_frame1): self._to_sql_empty(test_frame1) - def test_to_sql_fail(self, test_frame1): - self._to_sql_fail(test_frame1) - - def test_to_sql_replace(self, test_frame1): - self._to_sql_replace(test_frame1) - - def test_to_sql_append(self, test_frame1): - self._to_sql_append(test_frame1) - - def test_to_sql_method_multi(self, test_frame1): - # GH 29921 - self._to_sql(test_frame1, method="multi") - def test_create_and_drop_table(self): temp_frame = DataFrame( {"one": [1.0, 2.0, 3.0, 4.0], "two": [4.0, 3.0, 2.0, 1.0]}