Skip to main content
  1. Posts/

Working with MongoDB & Python

·1357 words·7 mins·
netdevops blog python mongodb
Maximilian Thoma
Author
Maximilian Thoma
network engineer
Table of Contents

MongoDB is considered an easy-to-use database for several reasons, particularly when used in conjunction with PyMongo.

  1. Schema-less Design

    MongoDB is a NoSQL database that uses a flexible, schema-less design. This means you don’t need to define a schema before inserting data. Documents (similar to rows in a relational database) can have different structures, allowing for greater flexibility and quicker iteration during development.

  2. JSON-like Documents

    MongoDB stores data in BSON format (Binary JSON). This makes it very intuitive to use, as JSON is a widely used data format, especially in web development. The documents in MongoDB resemble JSON objects, making it easy for developers to read and write data.

  3. Dynamic and Scalable

    MongoDB is designed to be highly scalable, with built-in support for sharding (horizontal scaling) and replication (high availability). This dynamic scaling capability makes it suitable for applications that need to grow quickly or handle large volumes of data.

  4. PyMongo Integration

    PyMongo is the official MongoDB driver for Python. It provides a simple and powerful way to interact with MongoDB from a Python application. PyMongo’s API is straightforward, making it easy to perform common database operations such as CRUD (Create, Read, Update, Delete) with minimal boilerplate code.

  5. Rich Query Language

    MongoDB offers a rich query language that allows for complex queries, including filtering, aggregation, and text search. PyMongo makes it easy to leverage these features, providing a seamless way to execute and manage queries.

  6. Strong Community and Documentation

    Both MongoDB and PyMongo have extensive documentation and a strong community. This ensures that developers can easily find resources, tutorials, and solutions to common problems, facilitating a smoother development experience.

Getting started
#

If you are using development machine with local docker you can start your own instance of mongodb to do some experiements.

This docker-compose file has mongo and mongo-express included. mongo-express is a nice web frontend to look into the database.

docker-compose.yaml
#

version: '3.1'
 
services:
 
  mongo:
    image: mongo
    restart: always
    environment:
      MONGO_INITDB_ROOT_USERNAME: root
      MONGO_INITDB_ROOT_PASSWORD: example
    ports:
      - 27017:27017
 
  mongo-express:
    image: mongo-express
    restart: always
    ports:
      - 8081:8081
    environment:
      ME_CONFIG_MONGODB_ADMINUSERNAME: root
      ME_CONFIG_MONGODB_ADMINPASSWORD: example
      ME_CONFIG_MONGODB_URL: mongodb://root:example@mongo:27017/
      ME_CONFIG_BASICAUTH: false

Use docker-compose command to start mongo db.

docker compose up

If you want to enter the mongosh (mongoshell) use following command:

# Identify your mongo instance name with "docker ps" command look to names column
docker ps
 
# Enter mongosh
docker exec -it mongo-mongo-1 mongosh

Easy use examples
#

That code shows the easiest way to start with pymongo and a mongo database and the standard CRUD cases.

from pymongo import MongoClient
 
# Define the MongoDB connection string with authentication
username = 'root'
password = 'example'
host = 'localhost'
port = 27017
connection_string = f'mongodb://{username}:{password}@{host}:{port}/'
 
# Connect to the MongoDB server
client = MongoClient(connection_string)
 
# Access a specific database
db = client['mydatabase']
 
# Access a specific collection
collection = db['mycollection']
 
# Insert a document
document = {"name": "John", "age": 30, "city": "New York"}
collection.insert_one(document)
 
# Find a document
result = collection.find_one({"name": "John"})
print(result)
 
# Update a document
collection.update_one({"name": "John"}, {"$set": {"age": 31}})
 
# Delete a document
collection.delete_one({"name": "John"})

Some advanced examples
#

Search a list in 2 different fields which contains a list get document if one matches
#

I used this search over a collection of firewall rules from Fortinet to find all matching rules which contains a set of objects.

This is the list which should be searched in two different fields:

Search list
#

searchlist = ['A', 'B', 'C']

The documents look like this:

[
{
"_id": ObjectId("6661e5210af04eab1211f6e2"),
"name": "rule aaa",
"dstaddr": ['C', 'E', 'F'],
"srcaddr": ['X', 'Y', 'Z'],
},
{
"_id": ObjectId("6661e5210af04eab1211f6f1"),
"name": "rule zzz",
"dstaddr": ['C', 'T', 'G'],
"srcaddr": ['A', 'W', 'Z'],
},
{
"_id": ObjectId("6661e5210af04eab1211f6g1"),
"name": "rule xyz",
"dstaddr": ['G', 'E', 'F'],
"srcaddr": ['X', 'Y', 'Z'],
}
]

The search:

# 'col_fw_rules' is a variable which points to the firewall ruleset collection
 
searchlist = ['A', 'B', 'C']
 
fw_rules_result = col_fw_rules.find(
    {"$or": [
        {"dstaddr": {"$in": searchlist}},
        {"srcaddr": {"$in": searchlist}}
      ]
    }
)

for match in  fw_rules_result:
    print(match)

Result:

# Only matched rules are given back
{"_id": ObjectId("6661e5210af04eab1211f6e2"), "name": "rule aaa", "dstaddr": ['C', 'E', 'F'], "srcaddr": ['X', 'Y', 'Z']},
{"_id": ObjectId("6661e5210af04eab1211f6f1"), "name": "rule zzz", "dstaddr": ['C', 'T', 'G'], "srcaddr": ['A', 'W', 'Z']}

Aggregate search#

Aggregate documents which match on scope dmz and site eberfing and group them by the cidr field, count them and only give back groups which have more than one document.

I use this search to find duplicate objects in the firewall object database which I have imported to my mongo database.

aggregate_example.py
#

# 'col_fw_objects' points to my firewall objects collection
 
pipeline = [
# first do a prematch which data you want to group
{
    '$match': {
        'scope': 'dmz',
        'site': 'eberfing'
    }
},
# now group your search results
{
    '$group': {
        '_id': '$cidr',
        'documents': {'$push': '$$ROOT'},
        'count': {'$sum': 1}
    }
},
# give only groups back where count greater than 1
{
    '$match': {
        'count': {'$gt': 1}
    }
}
]
 
# execute the aggregate
result = col_fw_objects.aggregate(pipeline)
 
# working with the results
for group in result:
    print(f"CIDR: {group['_id']}")
    for doc in group['documents']:
        print(doc)

Use mongo collection with TTL to auto delete documents
#

You can use a collection as a time series database to autodelete documents after a defined time.

Just add a field with the timestamp where the document was created, in this example “createdAt”. Create an index with “expireAfterSeconds”, the mongodb will automatically cleanup expired documents.

ttl_example.py
#

from pymongo import MongoClient
from datetime import datetime
 
# Define the MongoDB connection string with authentication
username = 'root'
password = 'example'
host = 'localhost'
port = 27017
connection_string = f'mongodb://{username}:{password}@{host}:{port}/'
 
# Connect to the MongoDB server
client = MongoClient(connection_string)
 
# Access a specific database
db = client['mydatabase']
 
# Access a specific collection
collection = db['mycollection']
 
# Insert a document with a timestamp
document = {"name": "John", "age": 30, "city": "New York", "createdAt": datetime.utcnow()}
collection.insert_one(document)
 
# Create a TTL index on the "createdAt" field, set to expire documents after 30 days (2592000 seconds)
collection.create_index("createdAt", expireAfterSeconds=2592000)

Set a field to unique
#

Sometimes you want to prevent duplicate documents, in this case you can create an index and set a field to unique. If you try to create a document with same value a error will raise.

unique_example.py
#

from pymongo import MongoClient
from datetime import datetime
 
# Define the MongoDB connection string with authentication
username = 'root'
password = 'example'
host = 'localhost'
port = 27017
connection_string = f'mongodb://{username}:{password}@{host}:{port}/'
 
# Connect to the MongoDB server
client = MongoClient(connection_string)
 
# Access a specific database
db = client['mydatabase']
 
# Access a specific collection
collection = db['mycollection']
 
# Ensure uniqueness by creating a unique index on the 'email' field
collection.create_index("email", unique=True)
 
# Insert documents
try:
    document1 = {"name": "John", "age": 30, "city": "New York", "email": "john@example.com", "createdAt": datetime.utcnow()}
    document2 = {"name": "Jane", "age": 25, "city": "Los Angeles", "email": "jane@example.com", "createdAt": datetime.utcnow()}
    document3 = {"name": "John", "age": 35, "city": "Chicago", "email": "john@example.com", "createdAt": datetime.utcnow()}  # This will cause a duplicate error
     
    collection.insert_one(document1)
    collection.insert_one(document2)
    collection.insert_one(document3)  # This insertion will fail due to unique index constraint
 
except Exception as e:
    print(f"An error occurred: {e}")
 
# Find documents
for doc in collection.find():
    print(doc)

Other examples
#

These are examples from the mongosh put the searches you do on mongosh can be executed 1 to 1 in pymongo.

Find in “grid” collection all records with count 0 and aci_endpoint_count 0 and no defined subnet (empty).

test> use database
database> db.grid.find({count: 0, aci_endpoint_count:0, subnet:{$ne: ""}})

Count the results, just add .count() to your search

test> use database
database> db.grid.find({count: 0, aci_endpoint_count:0, subnet:{$ne: ""}}).count()

Find records where you expect that a field called member exists

test> use database
database> db.fw_rules.find({member: {$exists: true}})

Use a variable on mongosh and integrate it into a search

test> use database
database> let search = "xxx";
database> db.fw_rules.find({$or:[{srcaddr:{$in: [search]}},{dstaddr:{$in: [search]}]})

Count the unique names in a collection in the name field

test> use database
database> var uniqueNameCount = db.fw_objects.aggregate([{$group: {_id: "$name", count: { $sum: 1 }}}, { $match: {count: 1}}, { $count: "uniqueNamesCount" }]);
database> uniqueNameCount
[ { uniqueNamesCount: 9461 } ]

Additional informations
#

Related

FLASK with LDAP Authentication against Active Directory and group authorization for specific pages
·457 words·3 mins
netdevops blog python flask ldap active_directory
This is an example of how to implement authentication for a FLASK website using Active Directory with LDAP.
Merge subnet lists
·80 words·1 min
netdevops blog python subnet cidr merge
This is a small python snippet to merge multiple subnet lists mixed with single ip addresses and CIDRs.
APIFlask Webhook Listener for Netbox
·258 words·2 mins
netdevops blog netbox python api apiflask
This little code snippet is the base of my Netbox Webhook Listener written in APIFlask.