scrapy之连接保存数据

238阅读 · 0评论 · 2020/05/01发布   前往评论

之前说了关于scrapy的存储。但是我们在实际中项目中肯定是要连接到连接sqlite和mysql数据库的。下面我们就来说说连接mysql的具体方法

基于sqlite的储存

class SqlitePipeline(object):
    @classmethod
    def from_settings(cls, settings):
        """
        可以通过配置DB_NAME 来定义数据库的名字
        :param settings:
        :return:
        """
        db_name = settings['DB_NAME']
        return cls(db_name)

    def __init__(self, db_name):
        if not db_name:
            db_name="DB"
        self.db = sqlite3.connect(f"{db_name}.sqlite")
        self.cursor = self.db.cursor()

    def close_spider(self, spider):
        """
        开关sqlite
        :param spider: 
        :return: 
        """
        self.cursor.close()
        self.db.close()
        self.cursor = None
        self.db = None

    def process_item(self, item, spider):
        """
        具体设置
        :param item: 
        :param spider: 
        :return: 
        """
        table_name = type(item).__name__  # 获得对象的类名
        lie_list = []
        insert_lie_list = []
        insert_value_list = []
        for key, value in item.items():
            if isinstance(value, int):
                lie_type = "INTEGER"
            elif isinstance(value, float):
                lie_type = "FLOAT"
            elif isinstance(value, str):
                lie_type = "VARCHAR(255)"
            elif isinstance(value, list):
                lie_type = "VARCHAR(255)"
                value = ",".join(value)
            if "href" in key:
                lie_type += " UNIQUE"  # 规定带有href的不能重复
            insert_lie_list.append(key)
            insert_value_list.append(value)
            lie_list.append(f"{key} {lie_type}")
            table_lastpart = ",".join(lie_list)
        sql1 = f"""CREATE TABLE IF NOT EXISTS {table_name} (id INTEGER  PRIMARY KEY AUTOINCREMENT,{table_lastpart})"""
        self.cursor.execute(sql1)
        try:
            sql2 = f"""INSERT INTO {table_name} ({",".join(insert_lie_list)}) VALUES ({("?," * len(insert_lie_list))[:-1]})"""
            self.cursor.execute(sql2, insert_value_list)
            self.db.commit()
        except sqlite3.OperationalError as e:
            print("错误类型是:", e)
        return item

连接mysql数据库

class AsynMysqlPipeline(object):
    """
    连接数据库,创建表,拆入数据
    """
    def __init__(self):
        params = dict(
            host="127.0.0.1",
            port=3306,
            db='xiaohua',
            user='root',
            passwd='123456',
            charset='utf8',
            cursorclass=pymysql.cursors.DictCursor
        )

        # 创建一个数据库连接池对象,这个连接池中可以包含多个connect连接对象。
        # 参数1:操作数据库的包名
        # 参数2:链接数据库的参数
        db_connect_pool = adbapi.ConnectionPool('pymysql', **params)
        self.dbpool = db_connect_pool
        result = self.dbpool.runInteraction(self.table)
        # 给result绑定一个回调函数,用于监听错误信息
        result.addErrback(self.error)

    def close_spider(self, spider):
        pass

    # 线面这两步分别是数据库的插入语句,以及执行插入语句。这里把插入的数据和sql语句分开写了,跟何在一起写效果是一样的
    def insert(self, cursor, item):
        sql2 = """insert into xiaohua (title,img_src,who) values (%s,%s,%s);"""
        cursor.execute(sql2, (item["title"], item['img_src'], 'heliyi'))

    def error(self, reason):
        print('--------', reason)

    def table(self, cursor):
        sql1 = """create table if  not exists xiaohua(
                title varchar (200),
                img_src varchar (255),
                who varchar (20)
                );"""
        cursor.execute(sql1)

    def process_item(self, item, spider):
        """
        在连接池中,开始执行数据的多线程写入操作。
        :param item:
        :param spider:
        :return:
        """
        # 参数1:在线程中被执行的sql语句
        # 参数2:要保存的数据
        result = self.dbpool.runInteraction(self.insert, item)
        # 给result绑定一个回调函数,用于监听错误信息
        result.addErrback(self.error)
        return item

最后记得在setting中开启AsynMysqlPipeline。




本文作者: 天行者
发布时间: 2020年05月01日 - 16:02
最后更新: 2020年05月01日 - 16:02
转载请保留原文链接及作者


登录 后回复

共有0条评论