I'm an inexperienced self-taught developer.
I have built an API with fastapi in python3, some endpoint are for authentication via ldap against a microsoft AD, those endpoints are handled via a router, included by the main app, I have just written a simple function that creates a serverpool object, checks the provided username to choose againsta which domain to authenticate and then binds the provided username and password, if the bind is succesfull It does a search for the user groups then returns the results and/or errors.
I'm trying to understand how to go about using a connection_pool to have persistent connection open to my ldap servers and do the bind and search operations using those connections.
I don't understand where should I put my ldap3 ServerPool and Connection object with a named connection_pool to call in my connection.
I'm guessing I need to use a multiprocess library, launch a process that instantiate the serverpool object with a connection_pool (using while? should I do create a class and instantiate the class in the separate process) and then call it from another thread (the fastapi /submit_credential endpoint), I don't know how I can structure my code to achieve just that, calling the serverpool connection pool from a function in another thread.
here's the code so far, stripped of non relevant parts (like logging and imports, this code runs and correctly replies with the bind result according to the provided user/password and extrapolating the correct domain):
I envisioned the following structure:
- at first call create a server pool object for the relevant domain AD servers
- create a connection pool with anonymous binds and no autobind
- use one of those connections to do a bind operation with the relevant user/password
- use one of those connections to do a search operation with either the provided user/password or a specific account that will be created for this purpose
- on subsequent call to the api the connection pool created in point 2 will be used
this question is as much about the technical aspect of creating a connection_pool as it is about a wider design perspective, suggestions in that area are welcome.
- /app/main.py
app = FastAPI(
dependencies=[Depends(get_api_key)]
)
app.include_router(useractions)
@app.get("/")
async def root():
logger.info("root api called")
return {"message": "please read the docs at endpoint /docs"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, log_config=None, host="0.0.0.0", port=8000)
- /app/routers/useractions.py
useractions = APIRouter(
prefix="/auth",
tags=["authentication"],
dependencies=[Depends(get_api_key)],
responses={404: {"description": "Not found"}},
)
@useractions.post('/submit_credentials')
async def parse_credentials(userlogin: UserLogin):
result = ldap_auth(userlogin.username,userlogin.password)
return result
- /app/dependecies/ldapauth.py
def ldap_auth(username, password):
try:
response = {}
pool_addresses = find_ldap_server(username)
ldap_pool = ServerPool(None,"FIRST", active=True, exhaust=True)
for server_address in pool_addresses:
ldap_server = Server(ip_address, use_ssl=True, get_info="DSA")
ldap_pool.add(ldap_server)
conn = Connection(ldap_pool, user=username, password=password, auto_bind=True, raise_exceptions=True)
active_server_default_naming_context = conn.server.__dict__['_dsa_info'].__dict__['other']['defaultNamingContext']
searchfilter="(UserPrincipalName=" + username + ")"
conn.start_tls
conn.search(
search_base=active_server_default_naming_context,
search_filter=searchfilter,
search_scope="SUBTREE",
attributes=["memberOf"]
)
groups = conn.entries[0].memberOf.values
response = {"outcome": "User_Valid", "username": username, "groups": groups}
return response
except Exception as ldaperror:
logger.error("ldap_auth error: {0}".format(ldaperror))
response = {"outcome": "User_LDAP_Error"}
error = ldaperror.__dict__
response.update(error)
return response
def find_ldap_server(username):
try:
domain = username.split('@')[1]
servers_list = [
{"domain": "domain1", "addresses": ("dc1.domain1.internal","dc2.domain2.internal")},
{"domain": "domain2.internal", "addresses": ("dc1.domain2.internal","dc2.domain2.internal")}
]
res = None
for sub in servers_list:
if sub['domain'] == domain:
res = sub
break
server_pool = res["addresses"]
return server_pool
except Exception as error:
logger.error("find_ldap_server error: {0}".format(error))
return error