python怎么实现redis双链表

编辑: admin 分类: mysql 发布时间: 2023-06-10 来源:互联网

redis 双链表

特点:

  • len: O(1),获取链表长度

  • head: O(1), 头部第一个节点

  • tail: O(1) 尾部第一个节点

  • 无环: 非循环链表

  • void *: 存储任意类型数据。 (动态语言天然自带)

2.双链表API接口

创建/销毁/初始化:

  • listCreate

  • listEmpty

  • listRelease

添加节点/删除节点:

  • listAddNodeHead

  • listAddNodeTail

  • listInsertNode

  • listDelNode

实现迭代器/正向/反向遍历:

  • listGetIterator

  • listReleaseIterator

  • listRewind

  • listRewindTail

  • listNext
    list复制,查找,旋转合并操作:

  • listDup

  • listSearchKey

  • listIndex

  • listRotateTailToHead

  • listRotateHeadToTail

  • listJoin

源码

3.使用python实现redis双链表的api

  • 参考redis list定义节点和DLinkList

  • python动态语言需要手动管理内存申请释放.

  • 使用生成器, 偷懒式实现正向反向遍历.

"""
参考redis adlist.c. 实现双链表相关的api
    
        head           tail
None    <-     <-      <-
        1       2       3       
        ->      ->     ->       None    

len:3
"""
import copy
from typing import Any

class  Node(object):
    def __init__(self, data) -> None:
        self.next = None
        self.pre = None
        self.data = data
    
    def __str__(self) -> str:
        return f"{self.data}"

class DLinkList(object):
    def __init__(self) -> None:
        self.head = None
        self.tail = None
        self.len = 0
    
    def empty(self) -> None:
        self.head = None
        self.tail = None
        self.len = 0
    
    def add_node_head(self, data=None) -> Node:
        """[头插法]

        Args:
            data ([type], optional): [description]. Defaults to None.
        """
        new_node = Node(data=data)
        if self.len == 0:
            # 链表为空. 处理头/尾指针
            self.tail = new_node
            self.head = new_node
        else:
            # 插入新节点
            new_node.next = self.head
            self.head.pre = new_node
            self.head = new_node
        self.len += 1
        return new_node
    
    def add_node_tail(self, data: Any=None) -> Node:
        """[尾插法]

        Args:
            data ([type], optional): [description]. Defaults to None.
        """
        new_node = Node(data=data)
        if self.len == 0:
            # 链表为空. 处理头/尾指针
            self.tail = new_node
            self.head = new_node
        else:
            # 插入新节点
            new_node.pre = self.tail
            new_node.next = self.tail.next
            self.tail.next = new_node
            # 更新尾指针
            self.tail = new_node
        self.len += 1
        return new_node
    
    def insert_node(self, old_node: Node=None, data: Any=None, after: bool=False) -> None:
        """[插入新节点, 在旧节点的前/后]

        Args:
            old_node (Node, optional): [旧节点]. Defaults to None.
            data (Any, optional): [新数据]. Defaults to None.
            after (Bool, optional): [是否在之后插入]. Defaults to False.
        """
        assert self.len, f"self.len({self.len}) must > 0"
        new_node = Node(data=data)
        if after:
            new_node.pre = old_node
            new_node.next = old_node.next
            old_node.next.pre = new_node
            old_node.next = new_node
            if self.tail == old_node:
                self.tail = new_node
        else:
            new_node.pre = old_node.pre
            new_node.next = old_node
            old_node.pre.next = new_node
            old_node.pre = new_node
            if self.head == old_node:
                self.head = new_node
        self.len += 1
    
    def del_node(self, node: Node) -> None:
        """[删除节点]

        Args:
            node (Node): [description]
        """
        assert self.len, "DLinklist is None"
        if not node:
            return
        if node == self.head:
            self.head = node.next
        else:
            # 1.处理next
            node.pre.next = node.next
        
        if node == self.tail:
            self.tail = node.pre
        else:
            # 2.处理pre
            node.next.pre = node.pre
        
        node.pre = None
        node.next = None
        del node 
        self.len -= 1

    def next(self, reversed:bool=False):
        """[获取生成器]

        Args:
            reversed (bool, optional): [description]. Defaults to False.

        Returns:
            [type]: [description]
        """
        if reversed:
            return self._reverse_next()
        return self._next()

    def _reverse_next(self):
        """[反向迭代, 使用生成器记录状态]

        Yields:
            [type]: [description]
        """
        cur = self.tail
        idx = self.len - 1
        while cur:
            yield (idx, cur)
            idx -= 1
            cur = cur.pre

    def _next(self):
        """[正向迭代, 使用生成器记录状态]]

        Yields:
            [type]: [description]
        """
        cur = self.head
        idx = 0
        while cur:
            yield (idx, cur)
            idx += 1
            cur = cur.next
    
    def dup(self):
        return copy.deepcopy(self)
    
    def find(self, key: Any=None) -> tuple:
        if not key:
            return
        
        g = self.next()
        for idx, node in g:
            if node.data == key:
                return idx, node
        return -1, None
    
    def rotate_tail_to_head(self) -> None:
        """[正向旋转节点]
        移动尾节点,插入到头部
        """
        assert self.len >= 2, "dlist len must >=2"
        head = self.head
        tail = self.tail
        # remove tail
        self.tail = tail.pre
        self.tail.next = None
        # move to head 
        tail.next = head
        tail.pre = head.pre
        self.head = tail

    def rotate_head_to_tail(self) -> None:
        """[反向旋转节点]
        移动头点,插入到尾部
        """
        assert self.len >= 2, "dlist len must >=2"
        head = self.head
        tail = self.tail
        # remove head
        self.head = head.next
        self.head.pre = None
        # move to tail
        tail.next = head
        head.pre = tail
        self.tail = head
        self.tail.next = None

    def join(self, dlist: Any=None):
        """[合并双链表]

        Args:
            dlist (Any, optional): [description]. Defaults to None.
        """
        pass

    def __str__(self) -> str:
        ans = ""
        for idx, node in self.next(reversed=False):  
            ans += f"idx:({idx}) data:({node.data})n"
        return ans


def test_add_node(dlist:DLinkList=None):
    n = 5
    for i in range(n):
        dlist.add_node_tail(data=i)
        # dlist.add_node_head(data=i)
    print(dlist)

def test_del_node(dlist:DLinkList=None):
    i = dlist.len
    while i: 
        node = None
        dlist.del_node(node)
        i -= 1
    print(dlist)
    
def test_insert_node(dlist:DLinkList=None):
    # dlist.insert_node(old_node=old_node, data=100, after=True)
    # print(dlist, id(dlist))
    # nlist = dlist.dup()
    # print(nlist, id(nlist))
    idx, fnode = dlist.find(1)
    print('find node:', idx, fnode)
    dlist.insert_node(fnode, 100, after=True)
    print("insert after")
    print(dlist)
    dlist.insert_node(fnode, 200, after=False)
    print("insert before")
    print(dlist)

def test_rotate(dlist:DLinkList=None):
    print("move head to tail")
    dlist.rotate_head_to_tail()
    print(dlist)

    print("move tail to head")
    dlist.rotate_tail_to_head()
    print(dlist)

def test_join(dlist:DLinkList=None, olist:DLinkList=None):
    print("join list")
    nlist = dlist.join(olist)
    print(nlist)


def main():
    dlist = DLinkList()
    test_add_node(dlist)
    # test_del_node(dlist)
    # test_insert_node(dlist)
    test_rotate(dlist)

if __name__ == "__main__":
    main()
登录后复制【感谢龙石为本站提供数据底座技术支撑http://www.longshidata.com/pages/government.html】