带有psycopg2模块的PythonPostgreSQL教程展示了如何使用psycopg2模块在Python中对PostgreSQL数据库进行编程。
PostgreSQL
PostgreSQL是一个功能强大的开源对象关系数据库系统。它是一个多用户数据库管理系统。它运行在多个平台上,包括Linux、FreeBSD、Solaris、MicrosoftWindows和MacOSX。PostgreSQL由PostgreSQL全球开发组开发。
psycopg2模块
PostgreSQL有几个Python库。语。在本教程中,我们使用psycopg2
模块。它是Python编程语言的PostgreSQL数据库适配器。它主要在C中作为libpq
包装器实现。
$ pip install psycopg2
我们安装了psycopg2
模块。
PythonPsycopg2版本示例
在第一个代码示例中,我们获取了PostgreSQL数据库的版本。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import sys con = None try: con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') cur = con.cursor() cur.execute('SELECT version()') version = cur.fetchone()[0] print(version) except psycopg2.DatabaseError as e: print(f'Error {e}') sys.exit(1) finally: if con: con.close()
在程序中,我们连接到先前创建的testdb
数据库。我们执行一条返回PostgreSQL数据库版本的SQL语句。
import psycopg2
psycopg2
是一个用于处理PostgreSQL数据库的Python模块。
con = None
我们将con变量初始化为None
。万一我们无法创建到数据库的连接(例如磁盘已满),我们就不会定义连接变量。这将导致finally子句中出现错误。
con = psycopg2.connect(database='testdb', user='postgres', password='s$cret')
connect
方法创建一个新的数据库会话并返回一个连接对象。用户是在没有密码的情况下创建的。在本地主机上,我们可以省略密码选项。否则,必须指定。
cur = con.cursor() cur.execute('SELECT version()')
从连接中,我们得到游标对象。游标用于遍历结果集中的记录。我们调用游标的execute
方法,执行SQL语句。
version = cur.fetchone()[0]
我们获取数据。由于我们只检索一条记录,因此我们调用了fetchone
方法。
print(version)
我们将检索到的数据打印到控制台。
except psycopg2.DatabaseError as e: print(f'Error {e}') sys.exit(1)
如果出现异常,我们会打印一条错误消息并使用错误代码1退出程序。
finally: if con: con.close())
在最后一步,我们释放资源。
$ version.py PostgreSQL 11.1, compiled by Visual C++ build 1914, 64-bit
在第二个示例中,我们再次获取PostgreSQL数据库的版本。这次我们使用with
关键字。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute('SELECT version()') version = cur.fetchone()[0] print(version)
程序返回PostgreSQL数据库的当前版本。使用with关键字。代码更紧凑。
with con:
使用with
关键字,Python会自动释放资源。它还提供错误处理。
Pythonpsycopg2执行
我们创建cars
表并向其中插入几行。execute
执行数据库操作(查询或命令)。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("DROP TABLE IF EXISTS cars") cur.execute("CREATE TABLE cars(id SERIAL PRIMARY KEY, name VARCHAR(255), price INT)") cur.execute("INSERT INTO cars(name, price) VALUES('Audi', 52642)") cur.execute("INSERT INTO cars(name, price) VALUES('Mercedes', 57127)") cur.execute("INSERT INTO cars(name, price) VALUES('Skoda', 9000)") cur.execute("INSERT INTO cars(name, price) VALUES('Volvo', 29000)") cur.execute("INSERT INTO cars(name, price) VALUES('Bentley', 350000)") cur.execute("INSERT INTO cars(name, price) VALUES('Citroen', 21000)") cur.execute("INSERT INTO cars(name, price) VALUES('Hummer', 41400)") cur.execute("INSERT INTO cars(name, price) VALUES('Volkswagen', 21600)")
程序创建cars
表并向表中插入八行。
cur.execute("CREATE TABLE cars(id SERIAL PRIMARY KEY, name VARCHAR(20), price INT)")
此SQL语句创建一个新的cars
表。该表有三列。
cur.execute("INSERT INTO cars(name, price) VALUES('Audi', 52642)") cur.execute("INSERT INTO cars(name, price) VALUES('Mercedes', 57127)")
这两行将两辆汽车插入表中。
$ psql -U postgres testdb psql (11.1) Type "help" for help. testdb=# SELECT * FROM cars; id | name | price ----+------------+-------- 1 | Audi | 52642 2 | Mercedes | 57127 3 | Skoda | 9000 4 | Volvo | 29000 5 | Bentley | 350000 6 | Citroen | 21000 7 | Hummer | 41400 8 | Volkswagen | 21600 (8 rows)
我们用psql
工具验证写入的数据。
Pythonpsycopg2executemany
executemany
方法是一种针对在提供的序列中找到的所有参数元组或映射启动数据库操作(查询或命令)的便捷方法。该函数主要用于更新数据库的命令:查询返回的任何结果集都将被丢弃。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 cars = ( (1, 'Audi', 52642), (2, 'Mercedes', 57127), (3, 'Skoda', 9000), (4, 'Volvo', 29000), (5, 'Bentley', 350000), (6, 'Citroen', 21000), (7, 'Hummer', 41400), (8, 'Volkswagen', 21600) ) con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("DROP TABLE IF EXISTS cars") cur.execute("CREATE TABLE cars(id SERIAL PRIMARY KEY, name VARCHAR(255), price INT)") query = "INSERT INTO cars (id, name, price) VALUES (%s, %s, %s)" cur.executemany(query, cars) con.commit()
此示例删除cars
表(如果存在)并(重新)创建它。
cur.execute("DROP TABLE IF EXISTS cars") cur.execute("CREATE TABLE cars(id SERIAL PRIMARY KEY, name VARCHAR(255), price INT)")
第一个SQL语句删除cars
表(如果存在)。第二个SQL语句创建cars
表。
query = "INSERT INTO cars (id, name, price) VALUES (%s, %s, %s)"
这是我们使用的查询。
cur.executemany(query, cars)
我们使用方便的executemany
方法将八行插入表中。该方法的第一个参数是参数化的SQL语句。第二个参数是数据,以元组的元组形式存在。
Pythonpsycopg2最后插入的行id
psycopg2
不支持lastrowid
属性。要返回最后插入行的id,我们必须使用PostgreSQL的RETURNINGid
条款。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("DROP TABLE IF EXISTS words") cur.execute("CREATE TABLE words(id SERIAL PRIMARY KEY, word VARCHAR(255))") cur.execute("INSERT INTO words(word) VALUES('forest') RETURNING id") cur.execute("INSERT INTO words(word) VALUES('cloud') RETURNING id") cur.execute("INSERT INTO words(word) VALUES('valley') RETURNING id") last_row_id = cur.fetchone()[0] print(f"The last Id of the inserted row is {last_row_id}")
程序创建一个新的words
表并打印最后插入行的ID。
$ lastrowid.py The last Id of the inserted row is 3
Pythonpsycopg2fetchall
fetchall
获取查询结果的所有(剩余)行,将它们作为元组列表返回。如果没有更多记录可获取,则返回一个空列表。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("SELECT * FROM cars") rows = cur.fetchall() for row in rows: print(f"{row[0]} {row[1]} {row[2]}")
在此示例中,我们从cars
表中检索所有数据。
cur.execute("SELECT * FROM cars")
此SQL语句从cars
表中选择所有数据。
rows = cur.fetchall()
fetchall
方法获取所有记录。它返回一个结果集。从技术上讲,它是元组的元组。每个内部元组代表表中的一行。
for row in rows: print(f"{row[0]} {row[1]} {row[2]}")
我们将数据逐行打印到控制台。
$ fetch_all.py 1 Audi 52642 2 Mercedes 57127 3 Skoda 9000 4 Volvo 29000 5 Bentley 350000 6 Citroen 21000 7 Hummer 41400 8 Volkswagen 21600
Pythonpsycopg2fetchone
fetchone
返回查询结果集的下一行,返回单个元组,或者当没有更多数据可用时返回None
。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("SELECT * FROM cars") while True: row = cur.fetchone() if row == None: break print(f"{row[0]} {row[1]} {row[2]}")
在本例中,我们连接到数据库并逐行获取cars
表的行。
while True:
我们从while循环访问数据。当我们读取最后一行时,循环终止。
row = cur.fetchone() if row == None: break
fetchone
方法返回表中的下一行。如果没有更多数据剩余,则返回None
。在这种情况下,我们中断循环。
print(f"{row[0]} {row[1]} {row[2]}")
数据以元组的形式返回。这里我们从元组中选择记录。第一个是Id,第二个是车名,第三个是车价。
Pythonpsycopg2字典游标
默认游标在元组的元组中检索数据。使用字典游标,数据以Python字典的形式发送。然后我们可以通过列名引用数据。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import psycopg2.extras con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cursor = con.cursor(cursor_factory=psycopg2.extras.DictCursor) cursor.execute("SELECT * FROM cars") rows = cursor.fetchall() for row in rows: print(f"{row['id']} {row['name']} {row['price']}")
在此示例中,我们使用字典游标打印cars
表的内容。
import psycopg2.extras
字典光标位于extras模块中。
cursor = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
我们创建一个DictCursor
。
for row in rows: print(f"{row['id']} {row['name']} {row['price']}")
数据通过列名访问。列名在PostgreSQL中被折叠为小写(除非引用)并且区分大小写。因此,我们必须以小写形式提供列名。
Pythonpsycopg2参数化查询
当我们使用参数化查询时,我们使用占位符而不是直接将值写入语句。参数化查询提高了安全性和性能。
Pythonpsycopg2
模块支持两种类型的占位符:ANSICprintf格式和Python扩展格式。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') uId = 1 uPrice = 62300 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("UPDATE cars SET price=%s WHERE id=%s", (uPrice, uId)) print(f"Number of rows updated: {cur.rowcount}")
我们更新了一辆汽车的价格。在此代码示例中,我们使用问号占位符。
cur.execute("UPDATE cars SET price=%s WHERE id=%s", (uPrice, uId))
字符(%s)是值的占位符。这些值被添加到占位符。
print(f"Number of rows updated: {cur.rowcount}")
rowcount
属性返回更新的行数。在我们的例子中,更新了一行。
$ parameterized_query.py Number of rows updated: 1 testdb=> SELECT * FROM cars WHERE id=1; id | name | price ----+------+------- 1 | Audi | 62300 (1 row)
汽车价格已更新。我们使用psql
工具检查更改。
第二个示例使用具有Python扩展格式的参数化语句。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 uid = 3 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("SELECT * FROM cars WHERE id=%(id)s", {'id': uid } ) row = cur.fetchone() print(f'{row[0]} {row[1]} {row[2]}')
我们使用pyformat
参数化语句选择汽车的名称和价格。
cur.execute("SELECT * FROM cars WHERE id=%(id)s", {'id': uid } )
命名占位符以冒号字符开头。
$ parameterized_query2.py 3 Skoda 9000
Pythonpsycopg2修改
mogrify
是PythonDBAPI的psycopg2扩展,它在参数绑定后返回查询字符串。返回的字符串正是将发送到运行execute
方法或类似方法的数据库的字符串。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') cur = None with con: cur = con.cursor() print(cur.mogrify("SELECT name, price FROM cars WHERE id=%s", (2,))) # cur.execute("SELECT name, price FROM cars WHERE id=%s", (2,) ) # row = cur.fetchone() # print(f"{row[0]} {row[1]}")
程序在使用mogrify
绑定参数后显示一个SELECT查询字符串。
$ mogrify.py b'SELECT name, price FROM cars WHERE id=2'
Pythonpsycopg2插入图片
在本节中,我们将向PostgreSQL数据库中插入一个图像。
testdb=> CREATE TABLE images(id SERIAL PRIMARY KEY, data BYTEA);
对于这个例子,我们创建一个名为images
的新表。对于图像,我们使用BYTEA
数据类型。它允许存储二进制字符串。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import sys def readImage(): fin = None try: fin = open("sid.jpg", "rb") img = fin.read() return img except IOError as e: print(f'Error {e.args[0]}, {e.args[1]}') sys.exit(1) finally: if fin: fin.close() con = None try: con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') cur = con.cursor() data = readImage() binary = psycopg2.Binary(data) cur.execute("INSERT INTO images(data) VALUES (%s)", (binary,)) con.commit() except psycopg2.DatabaseError as e: if con: con.rollback() print(f'Error {e}') sys.exit(1) finally: if con: con.close()
在程序中,我们从当前工作目录中读取一张图片,写入到PostgreSQLtestdb
数据库的images
表中。
try: fin = open("sid.jpg", "rb") img = fin.read() return img
我们从文件系统中读取二进制数据。我们有一个名为sid.jpg
的JPEG图像。
binary = psycopg2.Binary(data)
数据使用psycopg2
Binary
对象进行编码。
cur.execute("INSERT INTO images(data) VALUES (%s)", (binary,))
此SQL语句用于将图像插入数据库。
Pythonpsycopg2读取图像
在本节中,我们将执行反向操作。我们从数据库表中读取图像。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import sys def writeImage(data): fout = None try: fout = open('sid2.jpg', 'wb') fout.write(data) except IOError as e: print(f"Error {0}") sys.exit(1) finally: if fout: fout.close() try: con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') cur = con.cursor() cur.execute("SELECT data FROM images LIMIT 1") data = cur.fetchone()[0] writeImage(data) except psycopg2.DatabaseError as e: print(f'Error {e}') sys.exit(1) finally: if con: con.close()
我们从images表中读取图像数据并将其写入另一个文件,我们称之为sid2.jpg
。
try: fout = open('sid2.jpg', 'wb') fout.write(data)
我们以写入方式打开一个二进制文件。将数据库中的数据写入文件。
cur.execute("SELECT data FROM images LIMIT 1") data = cur.fetchone()[0]
这两行从images
表中选择并获取数据。我们从第一行获取二进制数据。
PythonPostgreSQL元数据
元数据是有关数据库中数据的信息。PostgreSQL数据库中的元数据包含有关我们存储数据的表和列的信息。受SQL语句影响的行数是元数据。结果集中返回的行数和列数也属于元数据。
可以使用游标对象的description
属性或information_schema
表获取PostgreSQL中的元数据。
接下来我们打印cars
表中的所有行及其列名。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute('SELECT * FROM cars') col_names = [cn[0] for cn in cur.description] rows = cur.fetchall() print(f'{col_names[0]} {col_names[1]} {col_names[2]}')
我们将cars
表的内容打印到控制台。现在,我们也包括列的名称。
col_names = [cn[0] for cn in cur.description]
我们从游标对象的description
属性中获取列名。
print(f'{col_names[0]} {col_names[1]} {col_names[2]}')
此行打印cars
表的三个列名称。
我们使用for循环打印行。数据与列名对齐。
$ column_names.py id name price
在下面的示例中,我们列出了testdb
数据库中的所有表。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') with con: cur = con.cursor() cur.execute("""SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'""") rows = cur.fetchall() for row in rows: print(row[0])
代码示例将当前数据库中的所有可用表打印到终端。
cur.execute("""SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'""")
表名存储在系统information_schema
表中。
$ list_tables.py cars countries projects employees users tasks images
这些是我们系统上的表。
Pythonpsycopg2导出导入数据
我们可以使用copy_to
和copy_from
导出和导入数据。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import sys con = None fout = None try: con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') cur = con.cursor() fout = open('cars.csv', 'w') cur.copy_to(fout, 'cars', sep="|") except psycopg2.DatabaseError as e: print(f'Error {e}') sys.exit(1) except IOError as e: print(f'Error {e}') sys.exit(1) finally: if con: con.close() if fout: fout.close()
代码示例将数据从cars
表复制到cars.csv
文件中。
fout = open('cars.csv', 'w')
我们打开一个文件,在其中写入cars
表中的数据。
cur.copy_to(fout, 'cars', sep="|")
copy_to
方法将数据从cars
表复制到打开的文件中。列用|
字符分隔。
$ cat cars.csv 2|Mercedes|57127 3|Skoda|9000 4|Volvo|29000 5|Bentley|350000 6|Citroen|21000 7|Hummer|41400 8|Volkswagen|21600 1|Audi|62300
这些是cars
文件的内容。
现在我们要进行反向操作。我们将转储的表导入回数据库表。
testdb=> DELETE FROM cars; DELETE 8
我们从cars
表中删除数据。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import sys con = None f = None try: con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') cur = con.cursor() f = open('cars.csv', 'r') cur.copy_from(f, 'cars', sep="|") con.commit() except psycopg2.DatabaseError as e: if con: con.rollback() print(f'Error {e}') sys.exit(1) except IOError as e: if con: con.rollback() print(f'Error {e}') sys.exit(1) finally: if con: con.close() if f: f.close()
在程序中我们读取了cars
文件的内容并将其复制回cars表。
f = open('cars.csv', 'r') cur.copy_from(f, 'cars', sep="|") con.commit()
我们打开cars.csv
文件进行读取,并将内容复制到cars
表中。更改已提交。
SELECT * FROM cars; id | name | price ----+------------+-------- 2 | Mercedes | 57127 3 | Skoda | 9000 4 | Volvo | 29000 5 | Bentley | 350000 6 | Citroen | 21000 7 | Hummer | 41400 8 | Volkswagen | 21600 1 | Audi | 62300 (8 rows)
输出显示我们已经成功地重新创建了保存的cars
表。
Pythonpsycopg2交易
事务是对一个或多个数据库中的数据进行数据库操作的原子单元。一个事务中所有SQL语句的效果可以全部提交到数据库,也可以全部回滚。
在psycopg2模块中,事务由连接类处理。连接游标的第一个命令启动一个事务。(我们不需要将SQL命令包含在BEGIN
和END
语句中来创建事务。这由psycopg2
自动处理。)以下命令是在此新事务的上下文中执行。如果出现错误,事务将中止并且不会执行进一步的命令,直到rollback
方法。
psycopg2
模块的文档说连接负责终止其事务,调用commit
或rollback
方法。提交的更改会立即持久化到数据库中。使用close
方法关闭连接或销毁连接对象(使用del或让它超出范围)将导致隐式rollback
调用。
psycopg2
模块还支持自动提交模式,其中对表的所有更改都会立即生效。要在自动提交模式下运行,我们设置连接的autocommit
属性对象为True
。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import sys con = None try: con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') cur = con.cursor() cur.execute("DROP TABLE IF EXISTS friends") cur.execute("CREATE TABLE friends(id SERIAL PRIMARY KEY, name VARCHAR(255))") cur.execute("INSERT INTO friends(name) VALUES ('Tom')") cur.execute("INSERT INTO friends(name) VALUES ('Rebecca')") cur.execute("INSERT INTO friends(name) VALUES ('Jim')") cur.execute("INSERT INTO friends(name) VALUES ('Robert')") con.commit() except psycopg2.DatabaseError as e: if con: con.rollback() print('Error {e}') sys.exit(1) finally: if con: con.close()
我们创建friends
表并尝试用数据填充它。然而,正如我们将看到的,数据将不会被提交。
#con.commit()
commit
方法被注释了。如果我们取消注释该行,数据将被写入表中。
finally: if con: con.close()
finally
块总是被执行。如果我们没有提交更改并且没有发生错误(这将回滚更改)事务仍然打开。使用close
方法关闭连接,并通过对rollback
方法的隐式调用终止事务。
testdb=# \dt List of relations Schema | Name | Type | Owner --------+-----------+-------+---------- public | cars | table | postgres public | countries | table | postgres public | employees | table | postgres public | images | table | postgres public | projects | table | postgres public | tasks | table | postgres public | users | table | postgres (7 rows)
只有在我们取消注释该行之后,才会创建friends
表。
Pythonpsycopg2自动提交
在自动提交模式下,立即执行一条SQL语句。
#!/usr/bin/python # -*- coding: utf-8 -*- import psycopg2 import sys con = None try: con = psycopg2.connect(database='testdb', user='postgres', password='s$cret') con.autocommit = True cur = con.cursor() cur.execute("DROP TABLE IF EXISTS friends") cur.execute("CREATE TABLE friends(id serial PRIMARY KEY, name VARCHAR(10))") cur.execute("INSERT INTO friends(name) VALUES ('Jane')") cur.execute("INSERT INTO friends(name) VALUES ('Tom')") cur.execute("INSERT INTO friends(name) VALUES ('Rebecca')") cur.execute("INSERT INTO friends(name) VALUES ('Jim')") cur.execute("INSERT INTO friends(name) VALUES ('Robert')") cur.execute("INSERT INTO friends(name) VALUES ('Patrick')") except psycopg2.DatabaseError as e: print(f'Error {e}') sys.exit(1) finally: if con: con.close()
在这个例子中,我们以自动提交模式连接到数据库。我们既不调用commit
也不调用rollback
方法。
con.autocommit = True
我们将连接设置为自动提交模式。
$ autocommit.py testdb=# select * from friends; id | name ----+--------- 1 | Jane 2 | Tom 3 | Rebecca 4 | Jim 5 | Robert 6 | Patrick (6 rows)
数据已成功提交到friends
表。
访问Python教程或列出所有Python教程。