Оглавление:
Это вторая часть руководства по созданию инструмента для скрапинга веб-страниц с помощью Python. Мы будем использовать интеграцию Celery и системы управления задачами.
В части 1 «Создание скрапера RSS-каналов с помощью Python» показано, как можно использовать Requests и Beautiful Soup.
В части 3 этой серии статей «Создание приложения для скрапинга веб-страниц с помощью Python, Celery и Django» я продемонстрирую, как интегрировать инструмент для скрапинга веб-страниц в приложения.
В предыдущей статье я создал простую программу скрапинга RSS-каналов, которая извлекает информацию с помощью Requests и BeautifulSoup (смотрите код на GitHub). Теперь мы будем использовать этот код как основу для создания системы управления задачами и запланированного скрапинга.
- Пояснение
- Терминал 1
- Терминал 2
- Терминал №3
- Терминал 2
- Улучшения
- Создание расписания
- Выполняем задачи
- Терминал 1
- Терминал 2
Следующим логическим шагом в скрапинге данных с веб-сайтов, которые часто меняются (то есть RSS-канала, отображающего X элементов за раз), является регулярный скрапинг. В предыдущем примере парсинга мы использовали командную строку для выполнения кода по команде. Однако это не масштабируемое решение. Чтобы автоматизировать его, мы добавим Celery для создания системы очереди задач с периодом выполнения.
Я буду использовать следующие инструменты:
- Python 3.7+;
- Requests;
- BeautifulSoup 4;
- Текстовый редактор (я использую Visual Studio Code);
- Celery— распределенная очередь задач;
- RabbitMQ— брокер сообщений.
Примечание. Все зависимости библиотеки перечислены в файлах requirements.txtи Pipfile / Pipfile.lock.
Краткое пояснение по Celery
Celery — это система управления задачами, она работает совместно с брокером сообщений для выполнения асинхронных задач.
Выше показано, что инициатор задач (наше приложение для скрапинга веб-страниц) передает информацию о задаче в очередь (Celery) для выполнения. Планировщик (Celery beat) выполняет их как задачи cron, без какой-либо дополнительной обработки или взаимодействий вне запуска приложения Celery.
Статьи
- Создание скрапера RSS-канала с помощью Python
- Автоматический парсинг веб-страниц с помощью Python и Celery (это руководство)
- Создание приложения для парсинга веб-страниц с помощью Python, Celery и Django
Краткое содержание проекта
Вот схема шагов, которые мы предпримем для создания окончательного проекта:
- Установка Celery и RabbitMQ — Celery управляет постановкой задач в очередь и их выполнением, а RabbitMQ обрабатывает сообщения.
- Начало работы с RabbitMQ и обработка логов.
- Создание доказательства концепции «Hello World» с помощью Celery, чтобы убедиться в том, что он работает.
- Регистрация функции скрапинга py с помощью Celery.
- Дальнейшее развитие и управление задачами скрапинга.
- Создание и выполнение расписания для задач скрапинга.
Примечание. Введение в RabbitMQ и Celery довольно длинное, если у вас есть опыт работы с ними, я рекомендую сразу перейти к шагу 4.
Приступаем к работе
Мы начнем с открытия каталога предыдущего проекта, в данном случае это web_scraping_example из предыдущей статьи. Если хотите, его можно клонировать с GitHub.
Примечание. Я использую Ubuntu, поэтому мои команды могут отличаться от ваших. Кроме того, для краткости я пропустил повторение кода с помощью …
Кроме того, требования к проекту могут задавать для этой установки использование pip, как в примере, приведенном ниже.
$ pip install celery
Почему Celery & RabbitMQ?
Мы используем Celery и RabbitMQ, потому что они довольно просты в настройке, тестировании и масштабировании в производственной среде. Хотя мы могли бы выполнять периодические задачи с помощью других библиотек, или просто заданий cron, в целом, я хотел, чтобы в следующей статье этой серии мы основывались на этом.
В долгосрочной перспективе нам будет намного проще, если мы будем использовать что-то, что сможем масштабировать в следующем проекте, а также изучим некоторые ключевые команды и инструменты по мере постепенного увеличения сложности.
Настройка RabbitMQ
Настроить и запустить сервер RabbitMQ в Ubuntu значительно проще, чем в операционной системе Windows. Я буду следовать официальному руководству по установке.
Ниже приведены команды установки для Debian, Ubuntu.
$ sudo apt-get update -y $ sudo apt-get install curl gnupg -y $ curl -fsSl https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc | sudo apt-key add - $ sudo apt-get install apt-transport https $ sudo tee /etc/apt/sources.list.d/bintray.rabbitmq.list <<EOF $ deb https://dl.bintray.com/rabbitmq-erlang/debian bionic erlang $ deb https://dl.bintray.com/rabbitmq/debian bionic main $ EOF $ sudo apt-get update -y $ sudo apt-get install rabbitmq-server -y --fix-missing
Когда я впервые установил RabbitMQ в виртуальной среде, он запустился автоматически. Чтобы проверить, что команда rabbitmq-server работает (ее мы будем использовать при работе с Celery), мне пришлось закрыть службу.
Я также заметил, что в разрешениях по умолчанию для установки указано Only root or rabbitmq should run rabbitmqctl shutdown, что мне показалось странным. Вместо того, чтобы решить эту проблему, я решил просто запустить sudo.
$ sudo rabbitmqctl shutdown
Затем я смог протестировать сервер, используя rabbitmq-server, я привожу команду и вывод ниже.
$ sudo rabbitmq-server
Вывод sudo rabbitmq-server в терминале
К вашему сведению — вы можете завершить команду rabbitmq-server, используя клавиатурную комбинацию Ctrl + C.
Для настройки RabbitMQ в операционной системе Windows требуются дополнительные действия. В официальной документации есть руководство по ручной установке.
Тестирование Celery с RabbitMQ
Создание скрапера RSS-канала с помощью Python
Прежде чем перейти к написанию кода проекта, я обычно начинаю с тестирования базовых примеров в стиле «Hello World», которые есть в пакетах и фреймворках. Это дает мне общее представление о том, чего я могу ожидать, а также несколько команд для терминала, которые нужно добавить в набор инструментов для каждой конкретной технологии.
В данном случае работа с Celery будет сопровождаться их собственным подтверждением концепции «Hello World» в виде задачи, выполняющей базовое добавление. Оно доступно в официальной документации Celery. Я собираюсь вкратце проиллюстрировать его. Однако, если вам нужны пояснения или подробный обзор, пожалуйста, ознакомьтесь с официальной документаций.
Теперь, когда у нас установлен и проверен брокер RabbitMQ, можно приступить к созданию файла tasks.py. Он будет содержать задачи, которые мы будем выполнять, будь то добавление, скрапинг веб-страниц или сохранение пользователей в базе данных. Теперь я внесу изменения в каталог проекта.
$ touch tasks.py
Чтобы создать задачу добавления, мы будем импортировать Celery, и создавать функцию с флагом @app.task, позволяющую Celery workers получать задачу в системе очереди.
# tasks.py from celery import Celery app = Celery('tasks') # определяем название приложения, которое будет использоваться во флаге @app.task # регистрация задачи для приложения def add(x, y): return x + y
Используя add этой задачи, мы можем начать тестирование исполнения. Здесь все может немного запутаться, так как на следующем шаге у меня будут одновременно открыты три терминала.
Я начну с краткого объяснения, затем углублюсь в код и предоставлю снимки экрана.
Пояснение
Чтобы завершить тест, мы будем выполнять задачу Celery с помощью командной строки, импортировав файл tasks.py и вызвав его. Чтобы задачи были получены в очередь, нам нужно, чтобы Celery worker и сервисы RabbitMQ были активными. Сервер RabbitMQ будет действовать как брокер сообщений, в то время как Celery worker будет выполнять задачи.
Я буду обозначать каждый шаг номерами терминалов:
- RabbitMQ
- Celery worker
- Выполнение задачи
Терминал 1
Мы начнем с запуска сервера RabbitMQ в терминале №1.
# RabbitMQ $ sudo rabbitmq-server
Запуск сервера RabbitMQ
Терминал 2
Впоследствии мы можем начать процесс Celery worker в терминале №2. Я добавил подробные настройки для worker, чтобы проиллюстрировать, как будет выглядеть результат.
Примечание: это необходимо выполнить из каталога проекта.
# Celery worker $ celery worker -A tasks -l INFO
Разберем приведенную выше команду:
- celery — пакет, который мы вызываем.
- worker — запуск процесса worker.
- -A tasks — явно объявляем, что нам нужно приложение
- -l INFO- задает наличие подробных событий ведения журнала консоли (нам нужно много деталей).
Чтобы проверить, правильно ли загружается worker, найдите в терминале строку concurrency: 4 (prefork).
Кроме того, мы замечаем, что приложение [tasks] было импортировано вместе с регистрацией задач из файла tasks.py. worker зарегистрировал единственную задачу (1) tasks.add.
Как выполнять математические действия с помощью операторов Python 3
Запуск Celery worker с подробной информацией
Терминал №3
Затем мы можем начать выполнение теста в терминале №3. Я буду выполнять цикл, чтобы проиллюстрировать, что служба worker перехватывает несколько задач. Мы добьемся этого, введя add из файла tasks.py, а затем выполнив цикл for. Примечание. После строки add.delay(i, i) вам нужно будет использовать клавиатурную комбинацию Ctrl + Enter длявыполнения команды.
$ python >>> from tasks import add # pulling in add from tasks.py >>> for i in range(1000): ... add.delay(i, i) # delay calls the task
Теперь вы должны увидеть большой блок вывода в терминале № 3 (выполнение задачи Celery). Это продемонстрирует, что worker получает результат задачи от терминала №2.
Запуск выполнения задачи Celery
Терминал 2
Если мы проверим Celery worker в терминале № 2, процесс, выполняющий задачу add, мы увидим, что он перехватывает каждое из выполнений задачи.
Celery worker, получающий и выполняющий задачи
Теперь мы успешно доказали, что Celery и RabbitMQ установлены правильно. Это помогает заложить основу для других задач, которые мы будем реализовывать (например, скрапинг веб-страниц), демонстрируя, как взаимодействуют Celery, Celery worker и RabbitMQ.
Теперь, когда мы рассмотрели установку и основы, мы перейдем к файлу tasks.py, чтобы создать задачи скрапинга веб-страниц.
Создание tasks.py с помощью Celery
Приведенный выше пример помог проверить процесс, который мы будем использовать для выполнения задач с помощью Celery, а также продемонстрировал, как задачи регистрируются с помощью Celery worker.
Основываясь на приведенном выше примере, мы начнем с создания задач скрапинга. Сейчас я собираюсь отказаться от файла scraping.py, так как он будет просто скопирован в файл tasks.py для простоты.
Я начну с удаления из примера функции def add(x, y) и копирования зависимостей (Requests и BeautifulSoup) вместе с самими функциями.
Примечание: я буду использовать те же функции, но в файле tasks.py.
# tasks.py from celery import Celery import requests # ввод данных from bs4 import BeautifulSoup # парсинг xml import json # экспорт в файлы app = Celery('tasks') # функция сохранения def save_function(article_list): with open('articles.txt', 'w' as outfile: json.dump(article_list, outfile) # функция скрапинга def hackernews_rss(): article_list = [] try: # выполняем запрос, разбираем данные с помощью XML # разбираем данные в BS4 r = requests.get('https://news.ycombinator.com/rss') soup = BeautifulSoup(r.content, features='xml') # выбираем только "item", которые нам нужны из данных articles = soup.findAll('item') # для каждого "item" разбираем его в список for a in articles: title = a.find('title').text link = a.find('link').text published = a.find('pubDate').text # создаем объект "article" с данными # из каждого "item" article = { 'title': title, 'link': link, 'published': published } # добавляем "article_list" с каждым объектом "article" article_list.append(article) # после цикла вносим сохраненные объекты в файл .txt return save_function(article_list) except Exception as e: print('The scraping job failed. See exception: ') print(e)
Манипуляции с датой и временем в Python с помощью Pandas
Упомянутые выше функции парсинга веб-страниц теперь доступны в файле tasks.py вместе с их зависимостями. Следующим шагом является регистрация задач в приложении Celery, для этого просто размещаем @app.task над каждой функцией.
# tasks.py... # то же, что и выше @app.task def save_function(article_list): ...@app.task def hackernews_rss():
Дальнейшее развитие функций скрапинга
Хотя функции скрапинга, которые мы определили, доказали свою эффективность для извлечения данных из RSS-канала, у нас все еще есть возможности для улучшения. Ниже я в общих чертах обрисовал, что мы будем изменять в наборе инструментов для скрапинга, прежде чем автоматизируем его.
Улучшения
- Сохранение результатов в файлах .json с отметками даты и времени.
- Добавление даты и времени created_at для каждой статьи.
- Добавление строки source, если мы хотим извлекать данные и с других сайтов.
Упомянутые выше изменения невелики, так как мы уже выполнили основную часть работы в рамках первой статьи о реализации. Хотя это несущественное изменение, .json будет читать немного удобнее, чем .txt. Два дополнительных столбца также помогут сделать его более «масштабируемым» при добавлении других каналов, а также при последующем анализе данных.
Давайте начнем с save_function: обновим ее для вывода файла .json и добавим временную метку, чтобы улучшить качество при обращении к ранее извлеченным данным.
# tasks.py from datetime import datetime # for time stamps ... def save_function(articles_list): # временная метка и имя файла timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') filename = 'articles-{}.json'.format(timestamp) # создаем файл статьи с временной меткой with open(filename, 'w').format(timestamp) as outfile: json.dump(article_list, outfile)
Я использую функцию datetime.now().strftime(…) для создания метки даты и времени, используя .format(timestamp).
Переходя к двум изменениям в функции hackernews_rss() — мы добавим некоторую информацию об источнике и отметку времени created_at. Это два простых изменения, которые помогут нам оставаться в курсе, если мы добавим дополнительные функции скрапинга.
# tasks.py ... def hackernews_rss(): ... for a in articles: ... article = { ... 'created_at': str(datetime.now()), 'source': 'HackerNews RSS' } ...
Указанные выше изменения иллюстрируют добавление столбцов created_at и source. К вашему сведению — если вы опустите переход str(), вы не сможете его выполнить из-за Object of type datetime is not JSON serializable.
Планирование задач с помощью Celery
Теперь мы будем использовать возможности Celery по планированию задач, опираясь на то, что beat_schedule поставляется из коробки. Это позволяет нам регистрировать задачи на определенное время с помощью агента планирования.
Отличное описание примеров планирования можно найти в официальной документации. Я также включил несколько дополнительных примеров расписания в файле tasks.py врепозитории GitHub.
Я собираюсь выполнять задачу скрапинга каждую минуту, так как это продемонстрирует, как Celery worker взаимодействует с запланированными задачами. Это не приведет к разнице в данных, поскольку используемый нами RSS-канал не обновляется ежеминутно.
Моя цель в этой демонстрации — показать выходные файлы статей и простое расписание задач.
Создание расписания
# tasks.py ... from celery.schedules import crontab # scheduler # выполнение запланированных задач app.conf.beat_schedule = { # выполняется каждую минуту 'scraping-task-one-min': { 'task': 'tasks.hackernews_rss', 'schedule': crontab() } } ...
Приведенная выше конфигурация будет регистрировать расписание задач в самом приложении Celery. После запуска мы сможем вызвать расписание Celery, чтобы дважды проверить, что он поставил в очередь.
Выполняем задачи
Теперь, когда расписание создано, пришло время включить сервер RabbitMQ и запустить процессы Celery worker.
В этом примере мы будем использовать две вкладки терминала:
- Сервер RabbitMQ
- Celery worker
Терминал 1
Чтобы запустить сервер RabbitMQ (наш брокер сообщений), мы будем использовать ту же команду, что и раньше.
Примечание: мне также нравится дважды проверять, что узел, созданный при запуске, выключен, поскольку он не запускается с журналом и выдает ошибку, если мы сначала не отключим его.
$ sudo rabbitmqctl shutdown $ sudo rabbitmq-server
Теперь вы должны видеть результат, аналогичный предыдущему (смотрите скриншот экрана, приведенный ниже).
Запуск сервера RabbitMQ
После запуска сервера RabbitMQ мы можем начать с терминала №2.
Терминал 2
Мы немного изменим команду, так как теперь она будет включать в себя обозначение -Bдля того, какие вызовы worker выполняет расписание.
$ celery -A tasks worker -B -l INFO
Вывод консоли будет иллюстрировать запуск приложения и (в зависимости от того, какое у вас расписание) будет выводить информацию о выполнении задачи.
На скриншоте ниже:
- Зарегистрированы [tasks].
- Начинается расписание
- Наш worker MainProcess получает задачу Received task: tasks.hackernews_rss.
- Запускается ForkPoolWorker и выполняет задачу, а затем возвращает результат.
Выполнение запланированной задачи
К вашему сведению — мы можем остановить выполнение запланированной задачи с помощью клавиатурной комбинации Ctrl + C, так как это будет работать бесконечно.
Теперь, когда мы успешно выполнили задачу save_function(), созданный ранее файл вывел файл .json.
Вывод .json файла задачи
Заключение
Мы успешно расширили наш простой инструмент для скрапинга веб-страниц, чтобы создать расписание. Это гарантирует, что нам больше не нужно вручную выполнять задачи скрапинга, и мы можем просто «включить и оставить». После планировки задач, проект сможет скрапить сайты на предмет наличия данных, которые изменяются по заданному расписанию (скажем, каждые 15 минут), и каждый раз возвращать новые данные.
Так что же нам делать дальше?
В третьей части этой серии статей я продемонстрирую приложение Django с интеграцией Celery и скрапингом веб-страниц. Это будет отличный пример веб-приложения, которое извлекает данные на сайт и заполняет его информацией. Нашим конечным продуктом будет агрегатор новостей, который будет извлекать информацию сразу из нескольких RSS-каналов.
Источник: www.internet-technologies.ru