之前几天也写了一部分,只不过看到_op文件太多了,仅以此文章梳理思路,可能会有点乱,勿喷。

代码参考nano-graphrag/nano_graphrag/_op.py at main · gusye1234/nano-graphrag

正文开始: 

首先从utils.py开始

第一个函数貌似是多线程并发,获取当前事件,如果没有,就创建一个。(看来有时间要补一补多线程的知识了)

logger=logging.getLogger("nano-graphrag")#创建一个日志,名为"nano-graphrag"
ENCODER=None

def always_gat_an_event_loop()->asyncio.AbstractEventLoop:
    try:
        #if there is already an event loop,use it.
        loop=asyncio.get_event_loop()#get the current existed loop
    except RuntimeError:
        #if in a sub-thread ,create a new event loop.
        logger.info("Creating a new event loop in a sub-thread")
        loop=asyncio.set_event_loop(loop)
    return loop

第二个,提取第一次完整输出的回答并转化为json格式

参数:接收一个字符串

这里还用到栈的知识,啊,不对,这里python基础不好,pop函数是把列表中的最后一个元素删除,并返回这个数的值,类似于栈中的出栈但不是,stack只是列表的名字。

first_json_start是一个存索引的变量。

python基础又来了,enumerate函数是一个迭代器,为这个s生成每个字符及其索引,就像是去吃饭,给每个点餐的发个号,还不是键值对。

这里提到append函数就要区分一下和extend的区别了,之前一直搞不懂。

append是往列表后面加且仅加一个元素。extend是加一个集合,比如一个列表后面加一个列表

(列表、元组、集合、字典(默认添加键)、字符串)而加dict的多用i一个tems(),values()。

OK,基础到此。流程就是,我接收一个字符串,创建一个列表,一个变量。然后循环这个字符串的每一个字符,如果遇到左括号,添加,并且这是第一个还得保存索引,如果是右括号,看看前面有没有东西,如果有出栈,没有就说明左右括号匹配了,那就从之前的索引到现在的索引切片,提取要的输出换行用空格代替转换为json格式。

def extract_first_complete_json(s:str):
    """Extract the first complete Json object from the string using a stack to track braces"""
    stack=[]
    first_json_start=None

    for  i,char in enumerate(s):
        if char=='{':
            stack.append(i)
            if first_json_start is None:
                first_json_start=i
        elif char=='}':
            if stack:
                start=stack.pop()
                if not stack:
                    first_json_str=s[first_json_start:i+1]
                try:
                    #try to parse the json string
                    return json.loads(first_json_str.replace("\n",""))
                except json.JSONDecoderError as e:
                    logger.error(f"json decoding failed{e}.Attempted string :{first_json_str[:50]}...")
                    return None
                finally:
                    first_json_start=None
    logger.warning("No complete json object found in the input string.")
    return None

第三个比较简单看注释就行了。 

def parse_value(value:str):
    """convert a string value to its appropriate type(int float bool,none string).Work as a more broad 'eval()' """
    value=value.strip()
    if value=="null":
        return None
    elif value=="true":
        return True
    elif value=="false":
        return False
    else:
        #covert to int/float
        try:
            if '.' in float:
                return float(value)
            else:
                return int(value)
        except ValueError:
            #if convert fails,return the raw value.
            return value.strip('"')# Remove surrounding quotes if they exist

第四个,从json格式中提取。先解释一下allow_no_quotes=False,就是一个参数,允许还是不允许有引号 ,后面格式会用。总的来说就是提取三种格式下的输出。返回值。

有双引号,没有双引号,嵌套大括号的

re.finditer是一个迭代器,regex_pattern是一个模式,re.DOTALL是一个标志,它告诉正则表达式引擎.(点号)应该匹配包括换行符在内的任何字符。在默认情况下,.不匹配换行符。由于您的正则表达式可能需要跨越多行来匹配整个值(特别是当值是一个嵌套的对象或数组时)

如果遇到嵌套的还得递归调用这个函数。

def extract_values_from_json(json_string,keys=["reasoning","answer","data"],allow_no_quotes=False):
    """extract key values from a non-standard or malformed json string"""
    extract_values={}
    #enhanced pattern to match both quoted and unquoted values ,as well as nested objects
    """ key:value,
        "key":"value",
        key:{{value}}
    """
    regex_pattern=r'(?P<key>"?\w+"?)\s*:\s*(?P<value>{[^}]*}|".*?"|[^,})+)'
    for match in re.finditer(regex_pattern,json_string,re.DOTALL):
        key=match.group('key').strip('"')
        value=match.group('value').strip()
        #if still be nested,break up
        if value.startswith('{') and value.endswith('}'):
            extract_values[key]=extract_values_from_json(value)
        else:
            #parse the value into appropriate type(int,float...)
            extract_values[key]=parse_value(value)
    if not extract_values:
        logger.warning("No values could be extracted from the string.")
    return extract_values

第五个,把大模型的响应 转换为json

def convert_response_to_json(response:str)->dict:
    """convert response string to json"""
    prediction_json=extract_first_complete_json(response)
    if prediction_json is None:
        logger.info("Attempting to extract values from a non-standard json string")
        prediction_json=extract_values_from_json(response,allow_no_quotes=True)#allow_no_quotes: "   " response generally include '""'
    if not prediction_json:
        logger.error("unable to extract meaningful data from the response.")
    else:
        logger.info("json data is successfully extracted.")
    return prediction_json

接下来几个都比较短合一起了

编码解码,其实本来想用deepseek的结果只有编码,没解码,还得huggingface验证啥的,先埋个雷吧,后面再回来看。

def encode_string_by_tiktoken(content:str,model_name:str="gpt-4o"):
    global ENCODER
    if ENCODER is None:
        ENCODER=tiktoken.encoding_for_model(model_name)
    tokens=ENCODER.encode(content)
    return tokens

def decode_tokens_by_tiktoken(tokens:list[int],model_name:str="gpt-4o"):
    global ENCODER
    if ENCODER is None:
        ENCODER=tiktoken.encoding_for_model(model_name)
    content=ENCODER.decode(tokens)
    return content

七,计算token数,编哈希值。

这个key是一个函数一个可调用的函数(比如 lambda 函数或任何定义了 __call__ 方法的对象),它接受列表中的一个元素作为输入,并返回用于计算令牌大小的字符串。就是使用 key 函数获取一个字符串。如果当前超了最大,那就赶紧停止,没有就继续叠加直到完了。

使用 content.encode() 将输入字符串编码为字节串(因为 MD5 哈希是基于字节计算的,而不是基于字符串)

  1. 调用 md5() 函数(来自 Python 的 hashlib 模块)来计算字节串的 MD5 哈希值。
  2. 使用 hexdigest() 方法将计算出的哈希值转换为一个十六进制字符串。
  3. 将前缀添加到哈希值之前,并返回结果。
def truncate_list_by_token_size(list_data:list,key:callable,max_token_size:int):
    """Truncate a list of data by token size"""
    if max_token_size<=0:
        return []
    tokens=0
    for i,data in enumerate(list_data):
        tokens+=len(encode_string_by_tiktoken(key(data)))#doubted :key(data)
        if tokens>max_token_size:
            return list_data[:i]
    return list_data

def compute_mdhash_id(content,prefix:str=""):
    return prefix+md5(content.encode()).hexdigest()

八.这几个都比较简单,唯一注意的是

  • roles[i%2] 用于交替选择 "user" 和 "assitant"(或应该是 "assistant")角色。因为 i%2 的结果会在 0 和 1 之间交替,所以 roles[i%2] 会在 "user" 和 "assitant" 之间交替。
def write_json(json_obj,file_name):
    with open(file_name,"w",encoding='utf-8') as f:
        json.dump(json_obj,f,indent=2,ensure_ascii=False)

def load_json(file_name):
    if not os.path.exists(file_name):
        print("No file exists.")
        return None
    else:
        with open(file_name,encoding='utf-8') as f:
            return json.loads(f)

def pack_user_ass_to_openai_messages(*args:str):
    roles=["user","assitant"]
    return [
        {"role":roles[i%2],"content":content} for i,content in enumerate(args)
    ]

def is_float_regex(value):
    return bool(re.match(r"^[-+]?[0-9]*\.?[0-9]+$",value))

九,

  • compute_mdhash_id 更适合用于对单个字符串内容进行哈希处理,可能用于验证内容完整性或作为唯一标识符。
  • compute_args_hash 则可以用于生成一组参数的唯一哈希值,这在需要基于参数集合进行缓存、去重或验证时非常有用。
  • 去除分割结果中的空字符串(通过 strip() 方法)
  • 转义某些字符后去除空白字符
    1. 如果 content 是数字类型(整数或浮点数),则直接将其转换为字符串并返回。
    2. 否则,先去除 content 字符串首尾的空格、单引号、和双引号。
    3. 最后,用双引号将处理后的字符串包围起来并返回。
def compute_args_hash(*args):
    return md5(str(args).encode()).hexdigest()

def split_string_by_multi_markers(content:str,makers:list[str])->list[str]:
    """split string by using multiple markers"""
    if not makers:
        return [content]
    results=re.split("|".join(re.escape(marker) for marker in makers),content)
    return [r.strip() for r in results if r.strip()]

def enclose_string_with_quotes(content:Any)->str:
    """enclose a string with quotes"""
    if isinstance(content,numbers.Number):
        return str(content)
    content=content.strip().strip("'").strip('"')
    return f'"{content}"'

十,

两个join一个用,合并成一行,一个用换行符合并成一整个csv字符串。

去除HTML格式和控制符(

  1. NUL(0x00):空字符,通常用于字符串的结束标志。
  2. SOH(0x01):文头/起始标志,用于标记数据包的开始。
  3. STX(0x02):文本开始,用于标记文本数据的开始。
  4. ETX(0x03):文本结束,用于标记文本数据的结束。)
def list_of_list_csv(data:list[list]):
    return '\n'.join(
        [
            ",".join([f"{enclose_string_with_quotes(data_dd)}" for data_dd in data_d] ) for data_d in data
        ]
    )

def clean_str(input:Any)->str:
    """clean an input string by removing HTML and so on."""
    if not isinstance(input,str):
        print("input isn't string.")
        return input
    result=html.unescape(input.strip())
    # https://stackoverflow.com/questions/4324790/removing-control-characters-from-a-string-in-python
    return re.sub(r"[\x00-\x1f\x7f-\x9f]","",result)

十一,

@dataclass一个装饰器,省去__init__这样的函数定义

def __call__用来让类的实例像函数那样被调用

@dataclass
class EmbeddingFunc:
    embedding_dim:int
    max_token_size:int
    func:callable

    async def __call__(self, *args, **kwargs)->np.ndarray:
        return await self.func(*args,**kwargs)

def limit_async_func_call(max_size:int,waitting_time:float=0.0001):
    """add restriction of maximm async calling times for a async func"""
    def final_decro(func):
        """ not using async.Semaphore to avoid use nest-asyncio"""
        __current_size=0
        @wraps(func)
        async def wait_func(*args,**kwargs):
            nonlocal __current_size
            while __current_size>=max_size:
                await asyncio.sleep(waitting_time)
            __current_size+=1
            result=await  func(*args,**kwargs)
            __current_size-=1
            return result
        return wait_func
    return final_decro

这里的多并发真的看不懂呀!!!!!!! 

import asyncio
from functools import wraps

def limit_async_func_call(max_size: int, waiting_time: float = 0.0001):
    """限制异步函数的同时调用次数。"""
    def final_decro(func):
        """不使用 async.Semaphore 以避免嵌套 asyncio 的使用。"""
        __current_size = 0  # 当前正在调用的函数数量

        @wraps(func)
        async def wait_func(*args, **kwargs):
            nonlocal __current_size  # 声明为非局部变量,以便在内部修改
            while __current_size >= max_size:
                await asyncio.sleep(waiting_time)  # 如果达到最大调用次数,则等待
            __current_size += 1  # 增加当前调用计数
            try:
                result = await func(*args, **kwargs)  # 调用原始函数并等待结果
            finally:
                __current_size -= 1  # 无论是否发生异常,都减少调用计数
            return result  # 返回结果

        return wait_func  # 返回装饰后的函数

    return final_decro  # 返回装饰器本身

十二,定义了一个名为 wrap_embedding_func_with_attrs 的高阶函数(也称为装饰器工厂),它接收任意数量的关键字参数(**kwargs),并返回一个装饰器 final_decro。这个装饰器的目的是将一个普通的函数 func 包装成一个 EmbeddingFunc 类的实例,同时将传入的关键字参数作为 EmbeddingFunc 的属性。

def wrap_embedding_func_with_attrs(**kwargs):
    """wrap a function with attributes"""
    def final_decro(func)->EmbeddingFunc:
        new_func=EmbeddingFunc(**kwargs,func=func)
        return new_func
    return final_decro

OKK!utils完成,先去看一下线程多并发再继续写吧。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐