Python LDAP3 操作 LDAP 实现增删改查

5 篇文章 1 订阅
2 篇文章 0 订阅

在介绍具体实现之前先啰嗦几句,很多运维朋友或正在使用 LDAP 的朋友来说,有很多场景需要对 LDAP 进行操作,比如:员工离职、员工更换部门、提升员工相应权限等。诸如以上的的操作都需要操作 LDAP 来实现,那么有几种办法来试下呢?
第一种:使用原始的ldap命令如 ldappadd、ldapsearch、ldapdelete、ldappasswd、ldapmodify等来操作。
第二种:使用 LDAP 客户端来操作,Windows 自带管理软件或者其他,Mac 使用ApacheDirectorystudio 通用的管理软件,Linux 如果是桌面版本同样可以使用 ApacheDirectorystudio。
本人之前同样写过一个简单版的,简单版链接,本次针对改版本进行优化改进。

简单说下以上 两种的缺点:

  • 第一种

    • 需要登陆远端服务器或本地 shell 安装相关命令;
    • 需要对 ldap 所有的命令有更深的了解;
    • 操作门槛高;
  • 第二种

    • 无论使用哪种客户端都需要在本机安装 Java 环境;
    • 操作需要预先学习软件;

针对以上情况下文介绍一种拿来即用的简单基于 Python3 的类库 LDAP3 实现对 LDAP 操作,实现日常所需的基本所有功能,增删改查等一系列操作,接下来看下实现过程;

具体效果展示

 python create_user.py -h                          
usage: create_user.py [-h] [-s HOST] [-o PORT] [-u USER] [-p PASSWORD]
                      [-b BASE_DC] [-n USER_NAME] [-g GROUP_NAME] [--admin]
                      [-i GID]

Arguments for talking to LDAP

optional arguments:
  -h, --help            show this help message and exit

standard arguments:
  -s HOST, --host HOST  LDAP service address to connect to
  -o PORT, --port PORT  Port to connect on
  -u USER, --user USER  User name to use when connecting to host
  -p PASSWORD, --password PASSWORD
                        Password to use when connecting to host
  -b BASE_DC, --base_dc BASE_DC
                        Base bc to use when connecting to host

sample-specific arguments:
  -n USER_NAME, --user_name USER_NAME
                        操作的用户名
  -g GROUP_NAME, --group_name GROUP_NAME
                        操作的用户组
  --admin               是否是LDAP Admin 默认是普通用户,加上该参数是管理员
  -i GID, --gid GID     设置组 ID

具体实现

设计说明:

  1. 由于每次的操作都需要进行一次 LDAP 的连接操作,故设计 LDAP 连接 读取默认配置,并且保留外传连接的可能,即如果配置过默认连接参数,在执行过程中无需输入链接信息;
  2. 解耦连接服务与操作具体操作LDAP;
  3. 增加自定义参数,方便后续扩展;
  4. 扩展类的抽离,如需添加额外功能增加方法即可;
  5. 操作过程使用外部传参,并对参数进行说明;

以下列出本次实现的主要文件与代码,如需源码请移步 源代码链接

  • 配置文件配置设计
# -*- coding: utf-8 -*-

__author__ = "ModeYl"


_LDAP_HOST = "ldap-server.magic.com"
_LDAP_PORT = 389
_LDAP_USER = "cn=manager,dc=magic,dc=com"
_LDAP_PASSWORD = "root123"
_LDAP_BASE_DC = "dc=magic,dc=com"


def get_config_parameter(parameter):
    if parameter == 'LDAP_HOST':  # String
        return _LDAP_HOST
    elif parameter == 'LDAP_PORT':  # int
        return _LDAP_PORT
    elif parameter == 'LDAP_USER':  # String
        return _LDAP_USER
    elif parameter == 'LDAP_PASSWORD':  # String
        return _LDAP_PASSWORD
    elif parameter == 'LDAP_BASE_DC':  # String
        return _LDAP_BASE_DC

  • 服务接口设计类
# -*- coding: utf-8 -*-

__author__ = "Mode, YL"

from ldap3 import Server, Connection, SAFE_SYNC, ALL, MODIFY_ADD, MODIFY_DELETE, MODIFY_REPLACE
from tools.serviceutil import *


class LDAP:
    def __init__(self, args):
        # 定义 server
        self.s = Server(args.host, get_info=ALL)
        # 定义链接
        self.conn = Connection(self.s, args.user, args.password, client_strategy=SAFE_SYNC, auto_bind=True)

        # 获取所有参数
        self.args = args

    def get_users(self, **kwargs):
        """

        :return: LIST: 操作的响应结果,包含 UID
        """
        # status: 操作是否成功 result: 操作的结果 response: 操作的响应结果 request: 原始发送的请求
        status, result, response, _ = self.conn.search('ou={},{}'.format(kwargs.get('ou'), self.args.base_dc),
                                                       '(objectclass=posixAccount)',
                                                       attributes=['uid'])  # attributes 限制查询出来的属性包括

        data = [row['attributes'] for row in response]

        return data

    def get_ou(self):
        """
        获取顶级所有 OU信息
        :return:
        """

        search_filter = '(objectclass=organizationalUnit)'

        status, result, response, _ = self.conn.search(self.args.base_dc, search_filter, attributes=['ou'])

        data = [row['attributes'] for row in response]

        return data

    def get_groups(self):
        """
        获取 Groups ou 下所有组信息
        :return: 返回所有组信息
        """
        search_filter = '(|(objectclass=posixGroup)(objectclass=groupOfUniqueNames))'

        status, result, response, _ = self.conn.search('ou=Groups,{}'.format(self.args.base_dc), search_filter,
                                                       attributes=['cn'])

        data = [row['attributes'] for row in response]

        return data

    def create_user(self, *args, **kwargs):
        """

        :param args: 已经存在的所有用户信息
        :param kwargs: 要创建的用户信息以及属性信息
        :return:
        """
        if not ldap_search(*args, **kwargs):
            ou = "ou={}".format(kwargs.get('ou'))
            uid = "uid={}".format(kwargs.get('uid'))
            # 根据定义的用户角色生成不同的dn 信息
            if kwargs.get('admin'):
                cn = "cn={}".format(kwargs.get('attribute').get('cn'))
                dn = "{},{},{}".format(cn, ou, self.args.base_dc)
            else:
                dn = "{},{},{}".format(uid, ou, self.args.base_dc)
            object_class = kwargs.get('objectclass')
            attribute = kwargs.get('attribute')
            status, result, response, _ = self.conn.add(dn, object_class, attribute)
            if status:
                return 1
            else:
                return result['description']
        else:
            return {"description": "User Is Exist"}

    def delete_user(self, *args, **kwargs):
        """
        删除组内成员
        :param args: 所有用户的 LIST
        :param kwargs: 准备删除用户的信息
        :return:
        """

        if ldap_search(*args, **kwargs):
            ou = "ou={}".format(kwargs.get('ou'))
            uid = "uid={}".format(kwargs.get('uid'))
            dn = "{},{},{}".format(uid, ou, self.args.base_dc)
            status, result, response, _ = self.conn.delete(dn)
            # 如果创建成功直接返回 True,如果失败返回失败描述
            if status:
                return 1
            else:
                return result['description']
        else:
            return {"description": "User Is Not Found"}

    def create_group_ou(self, *args, **kwargs):
        """
        创建顶层 ou
        :param args: 传入的组信息
        :param kwargs: 传入要创建组信息,包括组名 角色类 具体参数值
        :return:
        """
        # 传入*args **kwargs目的,将变量直接处理成 数组和 dict
        if not ldap_search(*args, **kwargs):
            ou = kwargs.get('ou')
            dn = "ou={},{}".format(ou, self.args.base_dc)
            object_class = kwargs.get('object_class')
            attribute = kwargs.get('attribute')
            status, result, response, _ = self.conn.add(dn, object_class, attribute)
            # 如果创建成功直接返回 True,如果失败返回失败描述
            if status:
                return 1
            else:
                return result['description']
        else:
            return {"description": "OU is Exist"}

    def create_group_cn(self, *args, **kwargs):
        """
        创建顶层 ou下 cn
        :param args: 传入的组信息
        :param kwargs: 传入要创建组信息,包括组名 角色类 具体参数值
        :return:
        """
        # 传入*args **kwargs目的,将变量直接处理成 数组和 dict
        if not ldap_search(*args, **kwargs):
            ou = kwargs.get('ou')
            cn = kwargs.get('cn')
            dn = "cn={},ou={},{}".format(cn, ou, self.args.base_dc)
            objectclass = kwargs.get('objectclass')
            attribute = kwargs.get('attribute')
            status, result, response, _ = self.conn.add(dn, objectclass, attribute)
            # 如果创建成功直接返回 True,如果失败返回失败描述
            if status:
                return 1
            else:
                return result['description']
        else:
            return {"description": "CN already exists under Group"}

    def delete_group(self, *args, **kwargs):
        """

        :param args: 传入的组信息
        :param kwargs: 传入要创建组信息,包括组名 角色类 具体参数值
        :return:
        """
        # 传入*args **kwargs目的,将变量直接处理成 数组和 dict
        if ldap_search(*args, **kwargs):
            row = kwargs.get('row')
            dn = "{},{}".format(row, self.args.base_dc)
            status, result, response, _ = self.conn.delete(dn)
            # 如果创建成功直接返回 True,如果失败返回失败描述
            if status:
                return 1
            else:
                return result['description']
        else:
            return {"description": "Group Is Not Found"}

    def UserToGroup(self, *args, **kwargs):
        group_name = kwargs.get('cn')
        ou_name = kwargs.get('ou')
        # 获取操作类型
        action = kwargs.get('action')
        # 拼接完成的 DN
        dn = "cn={},ou={},{}".format(group_name, ou_name, self.args.base_dc)
        # 定义放回值信息
        status, result = None, None
        # 判断组是否存在
        if ldap_search(*args, **kwargs):
            if action == 'append':
                if kwargs.get('user_type') == 1:
                    status, result, response, _ = self.conn.modify(dn, {
                        'memberUid': [(MODIFY_ADD, kwargs.get('memberUid'))]})
                elif kwargs.get('user_type') == 2:
                    uniqueMember = [
                        "uid={},ou=Users,{}".format(kwargs.get('memberUid')[0], self.args.base_dc)]  # 根据输入的类型拼接要处理的 dn
                    status, result, response, _ = self.conn.modify(dn, {'uniqueMember': [(MODIFY_ADD, uniqueMember)]})

                if status:
                    return 1
                else:
                    return result['description']

            elif action == 'remove':
                if kwargs.get('user_type') == 1:
                    status, result, response, _ = self.conn.modify(dn, {
                        'memberUid': [(MODIFY_DELETE, kwargs.get('memberUid'))]})
                elif kwargs.get('user_type') == 2:
                    uniqueMember = [
                        "uid={},ou=Users,{}".format(kwargs.get('memberUid')[0], self.args.base_dc)]
                    status, result, response, _ = self.conn.modify(dn, {'memberUid': [(MODIFY_DELETE, uniqueMember)]})

                if status:
                    return 1
                else:
                    return result['description']

            else:
                return {"description": "Unknown operation"}
        else:
            return {"description": "Group {} Not Exist".format(group_name)}

    def batch_modify_attribute(self, *args, **kwargs):
        """
        批量更改属性
        :param args: 获取所有用户
        :param kwargs: 更改用户的属性名称和值{}
        :return:
        """
        # 要更改什么组织下
        ou = kwargs.get('ou')

        ret = {}

        for name in args:
            dn = "uid={},{},{}".format(name.get('uid')[0], ou, self.args.base_dc)
            user_mail = "{}{}".format(name.get('uid')[0], kwargs.get('mail_suffix'))
            status, result, response, _ = self.conn.modify(dn, {'mail': [(MODIFY_REPLACE, user_mail)]})
            ret[name.get('uid')[0]] = result['description']
        return ret

    def modify_uid(self, *args, **kwargs):

        """
        更改用户名称
        :param args: 传入已经存在的用户列表
        :param kwargs: 要更改的属性
        :return:
        """

        if ldap_search(*args, **kwargs):
            ou = "ou={}".format(kwargs.get('ou'))
            # 待更改的 UID 名称
            old_uid = "uid={}".format(kwargs.get('uid'))
            # 拼接要更改的 DN
            dn = "{},{},{}".format(old_uid, ou, self.args.base_dc)
            # 更改后的 UID 名称
            new_uid = "uid={}".format(kwargs.get('replace_uid'))
            # 要先进行对主属性 UID 更新
            status, result, response, _ = self.conn.modify_dn(dn, new_uid)
            # 此方法直接进行替换主属性 UID 时不正常,只能进行删除与替换新增的,能替换原有的. 在替换时报"namingViolation"
            # 重新赋值 dn, 根据新的 dn 替换相关属性
            new_dn = "{},{},{}".format(new_uid, ou, self.args.base_dc)
            status, result, response, _ = self.conn.modify(new_dn, {'cn': [(MODIFY_REPLACE, kwargs.get('replace_uid'))],
                                                                    'displayName': [
                                                                        (MODIFY_REPLACE, [kwargs.get('replace_uid')])],
                                                                    'givenName': [
                                                                        (MODIFY_REPLACE, [kwargs.get('replace_uid')])],
                                                                    'homeDirectory': [
                                                                        (MODIFY_REPLACE, ['/home/{}'.format(
                                                                            kwargs.get('replace_uid'))])],
                                                                    'mail': [(MODIFY_REPLACE, [
                                                                        '{}@magicengine.com.cn'.format(
                                                                            kwargs.get('replace_uid'))])]})
            # print(self.conn.search(dn, '({})'.format(new_uid),
            #                        attributes=['uid', 'cn', 'givenName', 'givenName', 'homeDirectory', 'mail']))
            if status:
                return 1
            else:
                return result['description']
        else:
            return {"description": "User Not Found Can't Change"}

    def batch_modify_attribute(self, *args, **kwargs):
        """
        批量更改属性
        :param args: 获取所有用户
        :param kwargs: 更改用户的属性名称和值{}
        :return:
        """
        # 要更改什么组织下
        ou = kwargs.get('ou')

        ret = {}

        # 获取要更改的属性名
        attribute_name = kwargs.get('will_attribute_name')

        # 获取要更爱的属性值
        attribute_value = kwargs.get('attribute_value')

        for name in args:
            dn = "uid={},ou={},{}".format(name.get('uid')[0], ou, self.args.base_dc)
            # 拼接准备更改的字段
            user_mail = "{}{}".format(name.get('uid')[0], kwargs.get('attribute_value'))
            status, result, response, _ = self.conn.modify(dn, {attribute_name: [(MODIFY_REPLACE, user_mail)]})
            ret[name.get('uid')[0]] = result['description']
        return ret

    def __del__(self):
        # 关闭链接
        self.conn.unbind()

  • 服务处理扩展设计
# -*- coding: utf-8 -*-

from ldap3.utils.hashed import hashed
from ldap3 import HASHED_SALTED_SHA
import random


def general_user_passwd(password):
    """
    生成用户密码
    :param password: str
    :return: 加密后的用户密码: 使用 ssha 形式
    """

    ret = hashed(HASHED_SALTED_SHA, password)

    return ret


def get_random_number_str(length):
    """
    生成随机数字字符串
    :param length: 字符串长度
    :return:
    """
    num_str = ''.join(str(random.choice(range(10))) for _ in range(length))
    return num_str


def ldap_search(*args, **kwargs):
    """
    私有方法,抽离处判断组信息是否存在 LDAP 已有组信息内
    :param args: LDAP 返回的信息--LIST
    :param kwargs: 准备创建的组信息--Dict
    :return: Boolean
    """

    search_string = kwargs.get('search_string')

    exist = False
    # 将传入的数据进行分解后,每个元素的类型是<class 'ldap3.utils.ciDict.CaseInsensitiveDict'>, 在继续判读要创建的组是否存在.
    for i in args:
        for k, v in i.items():
            # if v[0] == search_string:
            # 判断要搜索的字符串是否在列表中即可
            if search_string in v:
                exist = True
    return exist

  • 使用参数扩展设计
# -*- coding: utf-8 -*-
import argparse
import getpass
from config.con_conf import *

__author__ = "ModeYl"


class Parser:
    def __init__(self):
        self._parser = argparse.ArgumentParser(description='Arguments for talking to LDAP')
        self._standard_args_group = self._parser.add_argument_group('standard arguments')
        self._specific_args_group = self._parser.add_argument_group('sample-specific arguments')

        # because -h is reserved for 'help' we use -s for service
        self._standard_args_group.add_argument('-s', '--host',
                                               required=False,
                                               type=str,
                                               default=get_config_parameter('LDAP_HOST'),
                                               action='store',
                                               help='LDAP service address to connect to')

        # because we want -p for password, we use -o for port
        self._standard_args_group.add_argument('-o', '--port',
                                               required=False,
                                               type=int,
                                               default=389,
                                               action='store',
                                               help='Port to connect on')

        self._standard_args_group.add_argument('-u', '--user',
                                               required=False,
                                               type=str,
                                               default=get_config_parameter('LDAP_USER'),
                                               action='store',
                                               help='User name to use when connecting to host')

        self._standard_args_group.add_argument('-p', '--password',
                                               required=False,
                                               default=get_config_parameter('LDAP_PASSWORD'),
                                               action='store',
                                               help='Password to use when connecting to host')

        self._standard_args_group.add_argument('-b', '--base_dc',
                                               required=False,
                                               default=get_config_parameter('LDAP_BASE_DC'),
                                               action='store',
                                               help='Base bc to use when connecting to host')

    def get_args(self):
        """
        Supports the command-line arguments needed to form a connection to vSphere.
        """
        args = self._parser.parse_args()
        return self._prompt_for_password(args)

    def _add_sample_specific_arguments(self, is_required: bool, *args):
        """
        Add an argument to the "sample specific arguments" group
        Requires a predefined argument from the Argument class.
        """
        for arg in args:
            name_or_flags = arg["name_or_flags"]
            options = arg["options"]
            options["required"] = is_required
            self._specific_args_group.add_argument(*name_or_flags, **options)

    def add_optional_arguments(self, *args):
        """
        Add an optional argument to the "sample specific arguments" group.
        Requires a predefined argument from the Argument class.
        """
        self._add_sample_specific_arguments(False, *args)

    def _prompt_for_password(self, args):
        """
        if no password is specified on the command line, prompt for it
        """
        if not args.password:
            args.password = getpass.getpass(
                prompt='"--password" not provided! Please enter password for host %s and user %s: '
                       % (args.host, args.user))
        return args


class Argument:
    def __init__(self):
        pass

    USER_NAME = {
        'name_or_flags': ['-n', '--user_name'],
        'options': {'action': 'store', 'help': '操作的用户名'}
    }

    GROUP_NAME = {
        'name_or_flags': ['-g', '--group_name'],
        'options': {'action': 'store', 'help': '操作的用户组'}
    }

    GROUP_NUMBER = {
        'name_or_flags': ['-i', '--gid'],
        'options': {'action': 'store', 'type': int, 'default': 505, 'help': '设置组 ID'}
    }

    ADMIN = {
        'name_or_flags': ['--admin'],
        'options': {'action': 'store_true', 'help': '是否是LDAP Admin 默认是普通用户,加上该参数是管理员'}
    }

    OU_NAME = {
        'name_or_flags': ['-ou', '--ou_name'],
        'options': {'action': 'store', 'help': '操作的 OU 名'}
    }

    SECONDARY_GROUP_NAME = {
        'name_or_flags': ['-se', '--secondary_name'],
        'options': {'action': 'store', 'help': '操作的二级组名'}
    }

    ACTION_LEVEL = {
        'name_or_flags': ['-l', '--level'],
        'options': {'action': 'store', 'type': int, 'help': '操作级别'}
    }

    CUS_ACTION = {
        'name_or_flags': ['-ac', '--action'],
        'options': {'action': 'store', 'help': '执行什么操作 包含 append,remove'}
    }

    USER_TYPE = {
        'name_or_flags': ['-t', '--user_type'],
        'options': {'action': 'store', 'type': int, 'help': '用户的类型 1: Linux 2: Wiki'}
    }

    RE_USER_NAME = {
        'name_or_flags': ['-re', '--new_user'],
        'options': {'action': 'store', 'help': '新用户名称'}
    }

    ATTRIBUTE_NAME = {
        'name_or_flags': ['-an', '--attribute_name'],
        'options': {'action': 'store', 'help': '准备更改的属性名称'}
    }

    ATTRIBUTE_VALUE = {
        'name_or_flags': ['-av', '--attribute_value'],
        'options': {'action': 'store', 'help': '准备更改的属性值'}
    }
  • 3
    点赞
  • 16
    收藏
    觉得还不错? 一键收藏
  • 4
    评论

“相关推荐”对你有帮助么?

  • 非常没帮助
  • 没帮助
  • 一般
  • 有帮助
  • 非常有帮助
提交
评论 4
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值