MongoDB is considered an easy-to-use database for several reasons, particularly when used in conjunction with PyMongo.
Schema-less Design
MongoDB is a NoSQL database that uses a flexible, schema-less design. This means you don’t need to define a schema before inserting data. Documents (similar to rows in a relational database) can have different structures, allowing for greater flexibility and quicker iteration during development.
JSON-like Documents
MongoDB stores data in BSON format (Binary JSON). This makes it very intuitive to use, as JSON is a widely used data format, especially in web development. The documents in MongoDB resemble JSON objects, making it easy for developers to read and write data.
Dynamic and Scalable
MongoDB is designed to be highly scalable, with built-in support for sharding (horizontal scaling) and replication (high availability). This dynamic scaling capability makes it suitable for applications that need to grow quickly or handle large volumes of data.
PyMongo Integration
PyMongo is the official MongoDB driver for Python. It provides a simple and powerful way to interact with MongoDB from a Python application. PyMongo’s API is straightforward, making it easy to perform common database operations such as CRUD (Create, Read, Update, Delete) with minimal boilerplate code.
Rich Query Language
MongoDB offers a rich query language that allows for complex queries, including filtering, aggregation, and text search. PyMongo makes it easy to leverage these features, providing a seamless way to execute and manage queries.
Strong Community and Documentation
Both MongoDB and PyMongo have extensive documentation and a strong community. This ensures that developers can easily find resources, tutorials, and solutions to common problems, facilitating a smoother development experience.
Getting started#
If you are using development machine with local docker you can start your own instance of mongodb to do some experiements.
This docker-compose file has mongo and mongo-express included. mongo-express is a nice web frontend to look into the database.
docker-compose.yaml#
version: '3.1'
services:
mongo:
image: mongo
restart: always
environment:
MONGO_INITDB_ROOT_USERNAME: root
MONGO_INITDB_ROOT_PASSWORD: example
ports:
- 27017:27017
mongo-express:
image: mongo-express
restart: always
ports:
- 8081:8081
environment:
ME_CONFIG_MONGODB_ADMINUSERNAME: root
ME_CONFIG_MONGODB_ADMINPASSWORD: example
ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/
ME_CONFIG_BASICAUTH: false
Use docker-compose command to start mongo db.
docker compose up
If you want to enter the mongosh (mongoshell) use following command:
# Identify your mongo instance name with "docker ps" command look to names column
docker ps
# Enter mongosh
docker exec -it mongo-mongo-1 mongosh
Easy use examples#
That code shows the easiest way to start with pymongo and a mongo database and the standard CRUD cases.
from pymongo import MongoClient
# Define the MongoDB connection string with authentication
username = 'root'
password = 'example'
host = 'localhost'
port = 27017
connection_string = f'mongodb://{username}:{password}@{host}:{port}/'
# Connect to the MongoDB server
client = MongoClient(connection_string)
# Access a specific database
db = client['mydatabase']
# Access a specific collection
collection = db['mycollection']
# Insert a document
document = {"name": "John", "age": 30, "city": "New York"}
collection.insert_one(document)
# Find a document
result = collection.find_one({"name": "John"})
print(result)
# Update a document
collection.update_one({"name": "John"}, {"$set": {"age": 31}})
# Delete a document
collection.delete_one({"name": "John"})
Some advanced examples#
Search a list in 2 different fields which contains a list get document if one matches#
I used this search over a collection of firewall rules from Fortinet to find all matching rules which contains a set of objects.
This is the list which should be searched in two different fields:
Search list#
searchlist = ['A', 'B', 'C']
The documents look like this:
[
{
"_id": ObjectId("6661e5210af04eab1211f6e2"),
"name": "rule aaa",
"dstaddr": ['C', 'E', 'F'],
"srcaddr": ['X', 'Y', 'Z'],
},
{
"_id": ObjectId("6661e5210af04eab1211f6f1"),
"name": "rule zzz",
"dstaddr": ['C', 'T', 'G'],
"srcaddr": ['A', 'W', 'Z'],
},
{
"_id": ObjectId("6661e5210af04eab1211f6g1"),
"name": "rule xyz",
"dstaddr": ['G', 'E', 'F'],
"srcaddr": ['X', 'Y', 'Z'],
}
]
The search:
# 'col_fw_rules' is a variable which points to the firewall ruleset collection
searchlist = ['A', 'B', 'C']
fw_rules_result = col_fw_rules.find(
{"$or": [
{"dstaddr": {"$in": searchlist}},
{"srcaddr": {"$in": searchlist}}
]
}
)
for match in fw_rules_result:
print(match)
Result:
# Only matched rules are given back
{"_id": ObjectId("6661e5210af04eab1211f6e2"), "name": "rule aaa", "dstaddr": ['C', 'E', 'F'], "srcaddr": ['X', 'Y', 'Z']},
{"_id": ObjectId("6661e5210af04eab1211f6f1"), "name": "rule zzz", "dstaddr": ['C', 'T', 'G'], "srcaddr": ['A', 'W', 'Z']}
Aggregate search#
Aggregate documents which match on scope dmz and site eberfing and group them by the cidr field, count them and only give back groups which have more than one document.
I use this search to find duplicate objects in the firewall object database which I have imported to my mongo database.
aggregate_example.py#
# 'col_fw_objects' points to my firewall objects collection
pipeline = [
# first do a prematch which data you want to group
{
'$match': {
'scope': 'dmz',
'site': 'eberfing'
}
},
# now group your search results
{
'$group': {
'_id': '$cidr',
'documents': {'$push': '$$ROOT'},
'count': {'$sum': 1}
}
},
# give only groups back where count greater than 1
{
'$match': {
'count': {'$gt': 1}
}
}
]
# execute the aggregate
result = col_fw_objects.aggregate(pipeline)
# working with the results
for group in result:
print(f"CIDR: {group['_id']}")
for doc in group['documents']:
print(doc)
Use mongo collection with TTL to auto delete documents#
You can use a collection as a time series database to autodelete documents after a defined time.
Just add a field with the timestamp where the document was created, in this example “createdAt”. Create an index with “expireAfterSeconds”, the mongodb will automatically cleanup expired documents.
ttl_example.py#
from pymongo import MongoClient
from datetime import datetime
# Define the MongoDB connection string with authentication
username = 'root'
password = 'example'
host = 'localhost'
port = 27017
connection_string = f'mongodb://{username}:{password}@{host}:{port}/'
# Connect to the MongoDB server
client = MongoClient(connection_string)
# Access a specific database
db = client['mydatabase']
# Access a specific collection
collection = db['mycollection']
# Insert a document with a timestamp
document = {"name": "John", "age": 30, "city": "New York", "createdAt": datetime.utcnow()}
collection.insert_one(document)
# Create a TTL index on the "createdAt" field, set to expire documents after 30 days (2592000 seconds)
collection.create_index("createdAt", expireAfterSeconds=2592000)
Set a field to unique#
Sometimes you want to prevent duplicate documents, in this case you can create an index and set a field to unique. If you try to create a document with same value a error will raise.
unique_example.py#
from pymongo import MongoClient
from datetime import datetime
# Define the MongoDB connection string with authentication
username = 'root'
password = 'example'
host = 'localhost'
port = 27017
connection_string = f'mongodb://{username}:{password}@{host}:{port}/'
# Connect to the MongoDB server
client = MongoClient(connection_string)
# Access a specific database
db = client['mydatabase']
# Access a specific collection
collection = db['mycollection']
# Ensure uniqueness by creating a unique index on the 'email' field
collection.create_index("email", unique=True)
# Insert documents
try:
document1 = {"name": "John", "age": 30, "city": "New York", "email": "john@example.com", "createdAt": datetime.utcnow()}
document2 = {"name": "Jane", "age": 25, "city": "Los Angeles", "email": "jane@example.com", "createdAt": datetime.utcnow()}
document3 = {"name": "John", "age": 35, "city": "Chicago", "email": "john@example.com", "createdAt": datetime.utcnow()} # This will cause a duplicate error
collection.insert_one(document1)
collection.insert_one(document2)
collection.insert_one(document3) # This insertion will fail due to unique index constraint
except Exception as e:
print(f"An error occurred: {e}")
# Find documents
for doc in collection.find():
print(doc)
Other examples#
These are examples from the mongosh put the searches you do on mongosh can be executed 1 to 1 in pymongo.
Find in “grid” collection all records with count 0 and aci_endpoint_count 0 and no defined subnet (empty).
test> use database
database> db.grid.find({count: 0, aci_endpoint_count:0, subnet:{$ne: ""}})
Count the results, just add .count() to your search
test> use database
database> db.grid.find({count: 0, aci_endpoint_count:0, subnet:{$ne: ""}}).count()
Find records where you expect that a field called member exists
test> use database
database> db.fw_rules.find({member: {$exists: true}})
Use a variable on mongosh and integrate it into a search
test> use database
database> let search = "xxx";
database> db.fw_rules.find({$or:[{srcaddr:{$in: [search]}},{dstaddr:{$in: [search]}]})
Count the unique names in a collection in the name field
test> use database
database> var uniqueNameCount = db.fw_objects.aggregate([{$group: {_id: "$name", count: { $sum: 1 }}}, { $match: {count: 1}}, { $count: "uniqueNamesCount" }]);
database> uniqueNameCount
[ { uniqueNamesCount: 9461 } ]