对Python高阶函数装饰器的理解和实战

奋斗吧
奋斗吧
擅长邻域:未填写

标签: 对Python高阶函数装饰器的理解和实战 博客 51CTO博客

2023-05-10 18:24:06 73浏览

对Python高阶函数装饰器的理解和实战,在学习装饰器之前我们需要先了解闭包闭包(closure)是一个概念,它可以用来描述一种特殊的函数。在Python中,当一个函数定义在另一个函数内部时,且该函数引用了外部函数中的变量或参数,那么这个函数就是一个闭包。在闭包中,外部函数中的变量或参数可以被内部函数引用,并且在外部函数已经执行完毕后,这些变量或参数的值仍然可以被内部函数访问和使用。具体地说,闭包是由一个函数和它所引用的自由变量组成的

在学习装饰器之前我们需要先了解闭包

闭包(closure)是一个概念,它可以用来描述一种特殊的函数。在 Python 中,当一个函数定义在另一个函数内部时,且该函数引用了外部函数中的变量或参数,那么这个函数就是一个闭包。在闭包中,外部函数中的变量或参数可以被内部函数引用,并且在外部函数已经执行完毕后,这些变量或参数的值仍然可以被内部函数访问和使用。

具体地说,闭包是由一个函数和它所引用的自由变量组成的封装体。自由变量是指在内部函数中,但是在外部函数中定义的变量或参数。在闭包中,内部函数可以访问外部函数中的变量和参数,并且在外部函数执行完毕后,这些变量和参数的值仍然可以被内部函数访问和使用。

闭包的主要作用是可以将一些变量或参数在多个函数之间进行共享。例如,当一个函数需要访问某个变量或参数,但是这个变量或参数并不是该函数的局部变量或参数时,可以使用闭包来实现变量或参数的共享。

示例:

def outer_function(x):
    def inner_function(y):
        return x + y
    return inner_function

closure = outer_function(10)
result = closure(5)
print(result)  # 输出 15


那么装饰器和闭包有什么联系呢

闭包和装饰器都是 Python 中的高级特性,它们之间有一定的联系,实际上装饰器本身就是一种应用了闭包的技术。

闭包是指一个函数内部定义的函数,可以访问其外部函数的变量或者参数。闭包可以用来实现函数的嵌套和封装,也可以用来实现一些高级特性,比如装饰器。

装饰器是一种语法糖,它可以让我们在不改变函数代码的情况下,对函数进行增强、修改或者扩展功能。装饰器可以在函数的定义前使用 @ 符号,将一个装饰器函数应用到函数上面,从而实现函数的装饰。

在装饰器的实现中,通常会使用闭包来实现。装饰器函数接收一个函数作为参数,然后返回一个新的函数,这个新函数就是对原函数进行增强或修改后的函数。在闭包中,装饰器函数可以访问被装饰的函数,同时也可以访问一些其他变量或参数,从而实现对函数的装饰或增强。


通过演示来理解装饰器

需求,一个加法函数,想增强它的共嫩不过,能够输出被调用过以及调用的参数信息

def add(x, y):
    return x+y

增加信息输出功能

def add(x,y):
    print("call add, x+y")  # 日志输出到控制台
    return x+y

上面的加法函数是完成了需求,但是有以下缺点:

打印语句的耦合太高

加法函数属于业务功能,而输出信息的功能,属于费业务功能代码,不该放在业务函数加法中

后面随着功能增加,这种侵入式的代码风格设计很显然是不合适的


做到了业务功能分离,但是fn函数调用传参是个问题:

def add(x,y):
    return x+y

def logger(fn, x, y):
    print("begin") # 增强的输出
    x = fn(x, y)
    print("end") # 增强的功能
    return x

print(logger(add, 4, 5))

对Python高阶函数装饰器的理解和实战_celery


为了解决传输的问题,通过可变参数进一步改变

def add(x, y):
    return x+y

def logger(fn, *args, **kwargs):
    print("begin")
    x = fn(*args, **kwargs)
    print("end")
    return x

print(logger(add, 5, y=60))


进一步演化

def logger(fn):
    def _logger(*args, **kwargs):
        print('before')
        ret = fn(*args, **kwargs)
        print('after')
        return ret

    return _logger


@logger         # 装饰器语法糖,相当于add1 = logger(add1)
def add1(x, y):  
    return x + y


print(add1(4, 1000))


对简单装饰器的要求

装饰器(无参)

1,它是一个函数

2,函数作为它的参数

3,返回值也是一个函数

4,可以用@functionname方式,简化调用


装饰器和高阶函数

装饰器是高阶函数,但装饰器是对传入函数的功能的装饰(功能)


示例:函数执行耗时打印

import datetime
import time


def logger(fn):
    def wrap(*args, **kwargs):
        # before 功能增强
        print("args={}, kwargs={}".format(args, kwargs))
        start = datetime.datetime.now()
        # 参数解构
        ret = fn(*args, **kwargs)
        # after功能增强
        duration = datetime.datetime.now() - start
        print("funtion {} took {}s".format(fn.__name__, duration.total_seconds()))
        return ret

    return wrap


@logger  # 相当于add = logger(add)
def add(x, y):
    print("======call add ========")
    time.sleep(2)
    return x + y


print(add(4, y=7))


对Python高阶函数装饰器的理解和实战_封装_02


带参装饰器

需求:对上面的函数进行改造,获取函数的执行时长,对时长超过阈值的函数记录下,这个阈值时可变的

def logger(t):
    def __logger(fn):
        def wrap(*args, **kwargs):
            # before 功能增强
            print("args={}, kwargs={}".format(args, kwargs))
            start = datetime.datetime.now()
            # 参数解构
            ret = fn(*args, **kwargs)
            # after功能增强
            duration = (datetime.datetime.now() - start).total_seconds()
            # 阈值,函数耗时大于3秒给记录下
            print("so slow") if duration > t else print("so fast")  # 三元表达式
            return ret

        return wrap

    return __logger


@logger(3)  # 相当于add = logger(3)(add)
def add(x, y):
    print("======call add ========")
    time.sleep(5)
    return x + y


print(add(4, y=7))


带参装饰器的要求

带参装饰器

1,它是一个函数

2,函数作为它的形参

3,返回值是一个不带参的装饰器函数

4,使用@functionname (参数列表) 方式调用

5,可以看作在装起外层又加了一层函数


对带参装饰器示例进一步演进

import datetime
import time


def logger(t, func1=lambda name, duration: print('{} took {}s'.format(name, duration))):
    def __logger(fn):
        def wrap(*args, **kwargs):
            # before 功能增强
            print("args={}, kwargs={}".format(args, kwargs))
            start = datetime.datetime.now()
            # 参数解构
            ret = fn(*args, **kwargs)
            # after功能增强
            duration = (datetime.datetime.now() - start).total_seconds()
            # 阈值,函数耗时大于3秒给记录下
            func1(fn.__name__, duration) if duration > t else print("so fast")  # 三元表达式
            return ret

        return wrap

    return __logger


@logger(3)  # 相当于add = logger(3)(add)
def add(x, y):
    print("======call add ========")
    time.sleep(5)
    return x + y


print(add(4, y=7))

将增加功能打印功能抽成一个函数作为参数传入,那么我func1可以是print 打印记录,我还可以传入func2这个可以是mysql数据库记录这个耗时记录,还可以传入func3,func4....你想加的功能加进去,这样更加灵活


装饰器实战

需求:基于flask开发的发布系统,在开发初期,我们是通过在manager.py编写命令函数(当然这个文件名可能叫其他的,总之你在执行flask cli命令前需要设置这个环境变量才可以执行 

export FLASK_APP="manager"

),通过flask cli命令来执行一些业务操作,但是有些重要的业务操作,比如清档,买机器,发布更新之类会变更数据的操作业务方想要我们在操作的时候能加一个google auth code验证,在真正执行操作逻辑的时候有这个验证,减少失误,但是我们在manager.py编写了很多命令函数,不可能在每个命令函数上都添加auth code验证判断代码,这样做灵活性太低,所以基于这个问题,我们可以使用装饰器功能,对需要做二次验证的命令函数去添加装饰器修饰


def check_authcode(function):
    """
    装饰器方法,authcode验证
    :param function:
    :return:
    """

    def wrapper(*args, **kwargs):  # *args是一个数组,**kwargs是一个字典
        secret_code = input("请输入authcode:   ")
        if not secret_code:
            print("authcode必须输入")
            exit(1)
        code_list = AuthCode().get_code()
        print(code_list)
        if secret_code not in code_list:
            print('secret code check fail,cannot purchase')
            exit(1)
        return function(*args, **kwargs)

    return wrapper


AuthCode类是这么封装的

import base64
import hashlib
import hmac
import time
import datetime
import random as _random
import io

'''

'''


class AuthCode:
    def byte_secret(self, secret):
        missing_padding = len(secret) % 8
        if missing_padding != 0:
            secret += '=' * (8 - missing_padding)
        return base64.b32decode(secret, casefold=True)

    def int_to_bytestring(self, i, padding=8):
        result = bytearray()
        while i != 0:
            result.append(i & 0xFF)
            i >>= 8
        return bytes(bytearray(reversed(result)).rjust(padding, b'\0'))

    # 根据约定的密钥计算当前动态密码
    def generate_otp(self, secret, timestamp=None):
        if not timestamp:
            timestamp = datetime.datetime.now()
        for_time = timestamp
        i = time.mktime(for_time.timetuple())
        intput = int(i / 30)
        digest = hashlib.sha1
        digits = 6
        if intput < 0:
            raise ValueError('input must be positive integer')
        hasher = hmac.new(self.byte_secret(secret), self.int_to_bytestring(intput), digest)
        hmac_hash = bytearray(hasher.digest())
        offset = hmac_hash[-1] & 0xf
        code = ((hmac_hash[offset] & 0x7f) << 24 |
                (hmac_hash[offset + 1] & 0xff) << 16 |
                (hmac_hash[offset + 2] & 0xff) << 8 |
                (hmac_hash[offset + 3] & 0xff))
        str_code = str(code % 10 ** digits)
        while len(str_code) < digits:
            str_code = '0' + str_code
        return str_code

    # 随机生成一个base32密钥
    def random_base32(self, length=16, random=_random.SystemRandom(),
                      chars=None):
        if chars is None:
            chars = list('ABCDEFGHIJKLMNOPQRSTUVWXYZ234567')
        return ''.join(
            random.choice(chars)
            for _ in range(length)
        )

    # 生成二维码地址
    def getQRCodeGoogleUrl(self, name, secret):
        return "otpauth://totp/" + name + "?secret=%s" % secret

    # 生成二维码base64格式
    # def genQr64(u):
    #     # # 原文链接:https: // blog.csdn.net / weixin_44070137 / article / details / 107097403
    #     qr = qrcode.make(u)
    #     buf = io.BytesIO()
    #     qr.save(buf)
    #     img_buf = buf.getvalue()
    #     img_stream = base64.b64encode(img_buf)
    #     return img_stream

    # 从数据库中读取某用户的code ,如果没有则生成code并写入数据库
    # def getcode(user, renew=False):
    #     r = mongo_googlecode_col_googlecode.find_one({user: {"$exists": True}})
    #     if renew or not r:  # 强制更新 或 查无此人
    #         print('生成新code', user)
    #         code = random_base32()
    #         mongo_googlecode_col_googlecode.update_one({user: {"$exists": True}}, {"$set": {user: code}}, upsert=True)
    #         # 写入后重读
    #         r = mongo_googlecode_col_googlecode.find_one({user: {"$exists": True}})
    #         return r[user]
    #     else:
    #         return r[user]

    def get_code(self):
        """
        使用说明
        使用random_base32 生成 密钥 sKey
        使用generate_otp(sKey) 生成 code 时间戳默认为当前,一般情况要传递30s 60s 90s 120s 之前 共计五个时间戳来生成code
        """
        print()
        # sKey = getcode('guoguo')
        # sKey = random_base32()
        # sKey = '7V37KCMUFFXZ5LF7'
        # print(sKey)
        # now = datetime.datetime.now()
        # now_30s = now - datetime.timedelta(seconds=30)
        # now_60s = now - datetime.timedelta(seconds=60)
        # now_90s = now - datetime.timedelta(seconds=90)
        # now_120s = now - datetime.timedelta(seconds=120)
        # codelist = [generate_otp(sKey, now),
        #             generate_otp(sKey, now_30s),
        #             generate_otp(sKey, now_60s),
        #             generate_otp(sKey, now_90s),
        #             generate_otp(sKey, now_120s)]
        # print(codelist)

        sKey = 'xxxx'
        now = datetime.datetime.now()
        now_30s = now - datetime.timedelta(seconds=30)
        now_60s = now - datetime.timedelta(seconds=60)
        now_90s = now - datetime.timedelta(seconds=90)
        now_120s = now - datetime.timedelta(seconds=120)
        codelist = [self.generate_otp(sKey, now),
                    self.generate_otp(sKey, now_30s),
                    self.generate_otp(sKey, now_60s),
                    self.generate_otp(sKey, now_90s),
                    self.generate_otp(sKey, now_120s)]
        return codelist
        # --start-- 通过sKey 生成base64形式的二维码
        # u = getQRCodeGoogleUrl('',sKey)
        # print(u)
        # # u = getQRCodeGoogleUrl('','xxxx')
        # b = genQr64(u)
        # print(b.decode('utf-8'))
        # --end--
        # 048837


通过生成的sKey生成的base64形式的二维码,然后我们在应用商店里下载Authticator这个软件,添加账号,然后通过扫描这个qr二维码生成账号


google auth code二次验证修饰命令函数示例:

@app.cli.command('qingdang')
@click.option('--prj', '-p', default=None)
@check_authcode
def opt_qingdang(proj):
    """
    执行游戏清档操作
    :return:
    """
    if proj not in gameflow.proj_list:
        print("未知的项目名")
        exit(-1)
    res = qingdang(proj)
    statu = res.status_code
    if statu == 200:
        print("清档成功")
    else:
        error_res = res.response
        for i in error_res:
            print(i)



因为装饰器所引发celery worker 执行任务模块路径预期错误的案例:

项目执行目录结构是这样的:

对Python高阶函数装饰器的理解和实战_封装_03

这个是各项目购买机器资源的模块分组

work_buy文件里面放着的是真正的执行购买逻辑的代码

fwxf目录下的work_buy.py是这样的

@c.task
@use_context
def work_buy(args):
    """参数验收""""
    hostname, mongodb = args
    """执行购买逻辑"""


slsk目录下的work_buy.py是这样的

@c.task
@use_context
def work_buy(args):
    """
    整体购买ecs,入库,完善rds信息,建逻辑库,导出xml资源
    :param args:
    :return:
    """
    # 拿到购买参数
    proj, Host_Server_Env, Host_Zone_Id, Host_Zone_spec, db_instance, Rds_name, Rds_Instanceid, Rds_Url, Rds_Type, \
    Rds_rr_name, Rds_rr_Instanceid, Rds_rr_Url, Rds_rr_Type = args
    """执行购买逻辑"""


其中@c.task 是表明该函数是一个celer 任务功能,@use_context 则是向函数增加了一个应用程序上文下推送的功能

@use_context是这样写的

from app import init_app

app = init_app()


def use_context(func):
    """
    应用程序上下文推送装饰器
    :param func:
    :return:
    """
    def wrapper(*args, **kwargs):
        with app.app_context():
            return func(*args, **kwargs)

    return wrapper


在购买蓝图的视图函数里我是这么调用购买逻辑的

@wf.route("/buy/<string:proj>", strict_slashes=False, methods=['GET', 'POST'])
@login_required
def buy_flow(proj):
    """参数验证"""
     # 动态加载各个项目的work_buy模块
        work_buy_module = importlib.import_module('proj.{}.work_buy'.format(proj))
        # 异步调用work_buy函数。注意所有work_buy的参数只能是字符串(序列化),否则celery会报错
        # 因为celery调用过程中使用redis,redis不能直接存储对象
        cid = work_buy_module.work_buy.delay(result[1:])
        return "xxxx"

这个result[1:]就是购买参数


当我执行调用slsk项目购买的时候发现一个报错,那就是说我参数和函数期待是不对等的,首先我怀疑是importlib动态加载引入work_buy模块这里出问题,本来我是执行slsk项目的购买,那么应该执行的是slsk目录下的work_buy而不是fwxf目录下的work_buy,我在代码里添加了打印语句,打印work_buy_module这个变量,发现没有问题

对Python高阶函数装饰器的理解和实战_celery_04


最后我怀疑是不是被别的celery_worker消费调用,因为我们在多台内网机上都部署了项目测试,最后发现是没有问题的,redis中间件代理地址是10.0.0.55:6379 ,我直接登上去看

对Python高阶函数装饰器的理解和实战_内部函数_05

发现都是57的这个ip在用,57就是我在用,所以被别的celery_worker消费这个可能性也排除掉


最后我又检查了celery配置,CELERY_INCLUDE配置项,也没有问题

对Python高阶函数装饰器的理解和实战_封装_06

最终我手动启动celery_worker发现了问题

对Python高阶函数装饰器的理解和实战_flask_07


因为我在项目的work_buy函数上都添加了@use_context装饰器,导致celery woker进程以后只扫描到一个action.app_use_context 的这样的一个异步任务,这样执行调用后会导致celery在加载模块时无法正确识别时哪一个模块是你想真正执行的任务模块


解决:为了解决这个问题需要我们在celery task装饰里添加一个name属性来指定任务的名字,为了不同的任务模板做区分

fwxf目录下的work_buy.py改这样

@c.task(name='fwxf.work_buy')
@use_context
def work_buy(args):
    """参数验收""""
    hostname, mongodb = args
    """执行购买逻辑"""


slsk目录下的work_buy.py是这样的

@c.task(name='slsk.work_buy')
@use_context
def work_buy(args):
    """
    整体购买ecs,入库,完善rds信息,建逻辑库,导出xml资源
    :param args:
    :return:
    """
    # 拿到购买参数
    proj, Host_Server_Env, Host_Zone_Id, Host_Zone_spec, db_instance, Rds_name, Rds_Instanceid, Rds_Url, Rds_Type, \
    Rds_rr_name, Rds_rr_Instanceid, Rds_rr_Url, Rds_rr_Type = args
    """执行购买逻辑"""


重新启动celery_worker

对Python高阶函数装饰器的理解和实战_封装_08

搞定!

好博客就要一起分享哦!分享海报

此处可发布评论

评论(0展开评论

暂无评论,快来写一下吧

展开评论

您可能感兴趣的博客

客服QQ 1913284695