pymongo: MongoDB 官方提供的 Python 工具包。官方文档: https://pymongo.readthedocs.io/en/stable/ pip安装,命令如下:
pip install pymongo
管理 MongoDB 的主要步骤如下:
第一步,连接 MongoDB:
# 方式一: 使用默认的配置
client = MongoClient()
# 方式二: 指定主机地址和端口号
client = MongoClient('localhost', 27017)
# 方式三: 使用URI连接参数
client = MongoClient('mongodb://localhost:27017/')
第二步,管理数据库:
#通过MongoClient对象来管理多个数据库获取数据库(逻辑库)对象
db = client.DATABASE_NAME
db = client["DATABASE_NAME"]
db = client.get_database(name=None, *args)
# 查看所有的数据库(逻辑库)
client.list_databases()
# 删除数据库(逻辑库)
client.drop_database(name_or_database, *args)
第三步,管理集合
# 通过数据库对象db来管理集合
# 获取集合对象
db = client.DATABASE_NAME
db.COLLECTION_NAME
client.DATABASE_NAME.COLLECTION_NAME
db.get_collection(name, *args)
# 查看当前数据库下的集合列表
db.list_collection_names()
# 删除集合
db.drop_collection(name_or_collection, *args)
基础操作示例:
# -*- coding: utf-8 -*-
# @Time : 2023-03-17 1:47
# @Author : AmoXiang
# @File : 1.数据库连接.py
# @Software: PyCharm
# @Blog : https://blog.csdn.net/xw1680from pymongo import MongoClient# 使用默认配置连接到数据库
# client = MongoClient()
# print(client)
# client.close()
# 指定主机地址和端口号连接到数据库
# client = MongoClient('localhost', 27017)
# 使用URI连接参数连接到数据库
client = MongoClient('mongodb://localhost:27017/')
print(client)
# client.close()
#
# # 访问数据库
# db = client.test
# db = client["test"]
# print(db)
# db = client.get_database('test')
# client.close()
# print(db)
#
# 查看有哪些数据库
db_list = client.list_databases()
# # db_list = client.list_database_names()
for item in db_list:print(item)
#
# 查看数据库下有哪些集合
db_test = client["test"]
for item in db_test.list_collection_names():print(item)
#
# # 集合对象操作
# data = db_test.students.find_one()
# data = client.test.students.find_one()
data = client.test.get_collection('students').find_one()
print(data)
说明:pymongo 在插入数据时可以将 python 的对象转换成 BSON
insert_one():插入一个文档
# 调用方法
result = db.COLLECTION_NAME.insert_one(doc)
# 返回插入的文档ID
result.inserted _id
insert_many():批量新增文档。调用方法:
doc_list = [doc1,doc2]
result = db.COLLECTION_NAME.insert_many(doc_list)
示例:
# -*- coding: utf-8 -*-
# @Time : 2023-03-17 1:55
# @Author : AmoXiang
# @File : 2.插入数据.py
# @Software: PyCharm
# @Blog : https://blog.csdn.net/xw1680import pymongoclient = pymongo.MongoClient(host="localhost", port=27017)
db = client.temp
collection = db.students# 单条数据的插入
student1 = {"id": '20170101', "name": "AmoXiang", "age": 20, "gender": "male"}
result = collection.insert_one(student1)
print(result)
print(result.inserted_id)
# 多条数据的插入
student2 = {"id": '20170102', "name": "Jordan", "age": 21, "gender": "female"}
student3 = {"id": '20170103', "name": "Mike", "age": 22, "gender": "female"}
result = collection.insert_many([student2, student3])
print(result)
print(result.inserted_ids)
pymongo 可以将查询的结果转换成 python 中的对象
常用方法:
find_one(): 按条件查询一个文档
find(): 按条件查询多个文档
count_documents(): 统计满足条件的文档总数
aggregate(): 聚合统计
.sort(): 排序
.skip().limit(): 分页
示例代码:
# -*- coding: utf-8 -*-
# @Time : 2023-03-18 15:03
# @Author : AmoXiang
# @File : 5.查询文档.py
# @Software: PyCharm
# @Blog : https://blog.csdn.net/xw1680from bson.objectid import ObjectId
from pymongo import MongoClient, ASCENDING, DESCENDINGclass LearnMongoDBSearch(object):""" MongoDB查询练习 """def __init__(self):self.client = MongoClient()def search_one(self):""" 查询一个文档 """temp_obj = self.client.test.newdb.find_one()print(temp_obj)print('喜欢分数:', temp_obj['likes'])# print('注册时间:', temp_obj['reg_date'].date())print('姓名:', temp_obj['uname'])def search_user_by_pk(self, pk):obj_id = ObjectId(pk)# user_obj = self.client.test.newdb.find_one({'_id': obj_id})# 面向对象的方法,有代码提示db = self.client.get_database('test')users = db.get_collection('newdb')user_obj = users.find_one({'_id': obj_id})print(user_obj)def search_many(self):""" 查询多个文档 """db = self.client.get_database('test')students = db.get_collection('students')# stu_list = students.find()# for item in stu_list:# print(item)# 查询年龄大于12岁的学生stu_list = students.find({'age': {'$gt': 12}}, {'stu_name': 1, 'class_name': 1, 'age': 1})for item in stu_list:# 注意: mongo中存储的整数转换成了浮点数print(item)def paginate(self, page=1, page_size=10):"""分页处理:param page: 当前的页:param page_size: 每一页数据大小:return:"""db = self.client.get_database('test')students = db.get_collection('students')offset = (page - 1) * page_sizestu_list = students.find().skip(offset).limit(page_size)return stu_listdef sort_data(self):""" 排序 """db = self.client.get_database('test')grades = db.get_collection('grades')# // 将学生的语文成绩从高到低排序# db.grades.find({"grade.course_name": "语文"}).sort({"grade.score": -1});## //将学生的语文成绩按照年龄和成绩排序# db.grades.find({"grade.course_name": "语文"}).sort({"age": -1, "grade.score": -1});# db.grades.find({"grade.course_name": "语文"}).sort({"grade.score": -1, "age": -1});# 按某一列排序# data_list = grades.find({"grade.course_name": "语文"}).sort("grade.score", DESCENDING);# for item in data_list:# print(item)# 按多列排序data_list = grades.find({"grade.course_name": "语文"}).sort([('age', DESCENDING),("grade.score", DESCENDING),])for item in data_list:print(item)def counter_students(self):""" 统计newdb中文档总数 """db = self.client.get_database('test')newdb = db.get_collection('newdb')# result = newdb.count_documents({})result = newdb.count_documents({"uname": {"$eq": "张三"}})print(result)def test_aggregate(self):"""聚合统计:及格的学生成绩"""db = self.client.get_database('test')grades = db.get_collection('grades')result = grades.aggregate([# //where{'$match': {"grade.score": {'$gte': 60}}},# //group by{'$group': {'_id': "$stu_no",'total': {'$sum': 1}}},# // having{'$match': {'total': {'$eq': 3}}}])for item in result:print(item)if __name__ == '__main__':obj = LearnMongoDBSearch()# obj.search_one()# obj.search_user_by_pk('6411ee77b6170000b4003f95')# obj.search_many()# stu_list = obj.paginate(page=3)# for item in stu_list:# print(item)# obj.sort_data()# obj.counter_students()obj.test_aggregate()
回顾,更新数据表达式,如下表所示:
修改一个文档,调用方法:
update_one(filter, update, *args)
替换一个文档,调用方法:
replace_one(filter, replacement, *args)
批量修改文档,调用方法:
result = db.COLLECTION_NAME.update_many(filter,update,*args)
快捷方法:
find_one_and_update(filter, update, *args) # 修改一个文档
find_one_and_replace(filter, replacement, *args) # 替换一个文档
# 注意返回值的不同
返回结果:
acknowledged:结果是否已经被确认
modified_count:修改的文档数
matched_count:满足条件的文档数
raw_result:原始数据
upserted_id:更新的ID (upsert=True)
示例代码:
# -*- coding: utf-8 -*-
# @Time : 2023-03-18 14:56
# @Author : AmoXiang
# @File : 4.修改文档.py
# @Software: PyCharm
# @Blog : https://blog.csdn.net/xw1680from pymongo import MongoClientclass LearnMongoDBUpdate(object):""" MongoDB更新练习 """def __init__(self):self.client = MongoClient()def test_update_one(self):""" 更新一个文档 """db = self.client.get_database('test')newdb = db.get_collection('newdb')result = newdb.update_one({}, {'$set': {'likes': 70}})print(result.modified_count)def test_replace_one(self):""" 替换一个文档 """db = self.client.get_database('test')newdb = db.get_collection('newdb')result = newdb.replace_one({}, {'uname': '张三'})print(result.modified_count)def test_update_many(self):""" 批量更新文档 """db = self.client.get_database('test')newdb = db.get_collection('newdb')result = newdb.update_many({}, {'$set': {'likes': 90}})print('修改的数量:', result.modified_count)print('满足条件的数量:', result.matched_count)def test_update_shortcut(self):""" 更新文档的快捷方法 """db = self.client.get_database('test')newdb = db.get_collection('newdb')# 此处返回一个文档对象result = newdb.find_one_and_update({}, {'$set': {'sex': '未知'}})print(result['_id'])if __name__ == '__main__':obj = LearnMongoDBUpdate()# obj.test_update_one()# obj.test_replace_one()# obj.test_update_many()obj.test_update_shortcut()
删除一个文档,调用方法如下:
result = db.COLLECTION_NAME.delete_one(filter, *args)
# 返回已经删除的记录数 result.deleted_count
# 删除时返回文档对象
find_one_and_delete(filter, *args)
批量删除文档,调用方法:
result = db.COLLECTION_NAME.delete_many(filter, *args)
# 返回已经删除的记录数
result. deleted_count
示例代码:
# -*- coding: utf-8 -*-
# @Time : 2023-03-18 14:34
# @Author : AmoXiang
# @File : 3.删除文档.py
# @Software: PyCharm
# @Blog : https://blog.csdn.net/xw1680from pymongo import MongoClientclass LearnMongoDBDelete(object):""" MongoDB删除练习 """def __init__(self):self.client = MongoClient()def test_delete_one(self):""" 删除一个文档 """db = self.client.get_database('test')newdb = db.get_collection('newdb')result = newdb.delete_one({})print(result.deleted_count)def test_delete_many(self):""" 批量删除文档 """db = self.client.get_database('test')users = db.get_collection('newdb')# 删除所有的数据result = users.delete_many({"likes": {"$lte": 90}})print(result.deleted_count)def test_delete_shortcut(self):""" 删除文档的快捷方法 """db = self.client.get_database('test')newdb = db.get_collection('newdb')result = newdb.find_one_and_delete({})print(result['title'])if __name__ == '__main__':obj = LearnMongoDBDelete()# obj.test_delete_one()# obj.test_delete_shortcut()obj.test_delete_many()
至此今天的学习就到此结束了,笔者在这里声明,笔者写文章只是为了学习交流,以及让更多学习数据库的读者少走一些弯路,节省时间,并不用做其他用途,如有侵权,联系博主删除即可。感谢您阅读本篇博文,希望本文能成为您编程路上的领航者。祝您阅读愉快!
好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
如果我的博客对你有帮助、如果你喜欢我的博客内容,请点赞
、评论
、收藏
一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
编码不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注
我哦!