yellyex #1941 9 часов назад (изменено) Qreeq написал 3 часа назад: fitonyash написал 6 часов назад: Qreeq написал 7 часов назад: Надо признать парсер получился как минимум рабочий. Или тут есть какая-то фишка, которую я не понимаю... Надо соню прогнать @yellyex а ну ка прогони в своей проге Соню по полит топану, братанчик Там надо долго ждать. Плюс ни квен и ни дипсик не грузят тонны текста. Тут нужно использовать агента типа курсора, а у меня подписки нету. Так что либо сами, либо никак. Изменено 8 часов назад пользователем yellyex Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение
Gomosek #1943 8 часов назад yellyex написал 1 минуту назад: Qreeq написал 3 часа назад: fitonyash написал 5 часов назад: Qreeq написал 7 часов назад: Надо признать парсер получился как минимум рабочий. Или тут есть какая-то фишка, которую я не понимаю... Надо соню прогнать @yellyex а ну ка прогони в своей проге Соню по полит топану, братанчик Там надо долго ждать. Плюс ни квен и ни дипсик не грузят тонны текста. Тут нужно использовать агента типа курсора, а в меня подписки нету. Так что либо сами, либо никак. Остановись. Они тебе нихуя скинуть не могут, а ты им от души все делаешь. Сделай это платной услугой. Первый стартап продоты Цитата Поделиться сообщением Ссылка на сообщение
yellyex #1944 8 часов назад (изменено) Gomosek написал 24 минуты назад: yellyex написал 27 минут назад: Qreeq написал 3 часа назад: fitonyash написал 6 часов назад: Qreeq написал 7 часов назад: Надо признать парсер получился как минимум рабочий. Или тут есть какая-то фишка, которую я не понимаю... Надо соню прогнать @yellyex а ну ка прогони в своей проге Соню по полит топану, братанчик Там надо долго ждать. Плюс ни квен и ни дипсик не грузят тонны текста. Тут нужно использовать агента типа курсора, а в меня подписки нету. Так что либо сами, либо никак. Остановись. Они тебе нихуя скинуть не могут, а ты им от души все делаешь. Сделай это платной услугой. Первый стартап продоты Да я бескорыстный парень 🤗🤗🤗☺️☺️☺️ Изменено 8 часов назад пользователем yellyex Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение
[blindfold] #1945 8 часов назад yellyex написал 7 минут назад: Gomosek написал 30 минут назад: yellyex написал 33 минуты назад: Qreeq написал 3 часа назад: fitonyash написал 6 часов назад: Qreeq написал 7 часов назад: Надо признать парсер получился как минимум рабочий. Или тут есть какая-то фишка, которую я не понимаю... Надо соню прогнать @yellyex а ну ка прогони в своей проге Соню по полит топану, братанчик Там надо долго ждать. Плюс ни квен и ни дипсик не грузят тонны текста. Тут нужно использовать агента типа курсора, а в меня подписки нету. Так что либо сами, либо никак. Остановись. Они тебе нихуя скинуть не могут, а ты им от души все делаешь. Сделай это платной услугой. Первый стартап продоты Да я бескорыстный парень 🤗🤗🤗☺️☺️☺️ Цитата Поделиться сообщением Ссылка на сообщение
yellyex #1946 8 часов назад [blindfold] написал 2 минуты назад: yellyex написал 10 минут назад: Gomosek написал 32 минуты назад: yellyex написал 35 минут назад: Qreeq написал 3 часа назад: fitonyash написал 6 часов назад: Qreeq написал 7 часов назад: Надо признать парсер получился как минимум рабочий. Или тут есть какая-то фишка, которую я не понимаю... Надо соню прогнать @yellyex а ну ка прогони в своей проге Соню по полит топану, братанчик Там надо долго ждать. Плюс ни квен и ни дипсик не грузят тонны текста. Тут нужно использовать агента типа курсора, а в меня подписки нету. Так что либо сами, либо никак. Остановись. Они тебе нихуя скинуть не могут, а ты им от души все делаешь. Сделай это платной услугой. Первый стартап продоты Да я бескорыстный парень 🤗🤗🤗☺️☺️☺️ Ты забыл пожалуйста-пожалуйста Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение
yellyex #1947 3 часа назад Новый парсер: Скрытый текст # -*- coding: utf-8 -*- """ Парсер форума prodota.ru (улучшенная версия) Режимы работы: - Режим 1 (только текст): сохраняет сообщения в JSON или CSV с опциональной разбивкой по размеру. - Режим 2 (полный парсинг): скачивает вложения и создаёт локальную HTML-копию темы. Особенности: - Модуль logging вместо print() для удобного логгирования. - Retry-стратегия для устойчивости к сетевым ошибкам. - Потоковая обработка данных для экономии памяти. - Динамическое определение MIME-type для медиафайлов. - Валидация путей и конфигурации. - Поддержка как локального запуска, так и Android-среды. """ # ========================== ИМПОРТЫ ========================== import os import re import sys import json import csv import time import logging from pathlib import Path from urllib.parse import urljoin, urlparse, unquote, parse_qs from typing import Optional, List, Dict, Any, Generator import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from bs4 import BeautifulSoup, Tag # ========================== КОНФИГУРАЦИЯ ========================== class Config: """Централизованное хранилище настроек.""" # Режим по умолчанию: None, 1 или 2 DEFAULT_MODE: Optional[int] = None # URL первой страницы темы (обязательно!) START_URL: str = 'https://prodota.ru/forum/topic/212849/' # Имя папки для сохранения (без спецсимволов) FOLDER_NAME: str = 'prodota_archive' # Базовая директория для сохранения (поддерживает Windows/Linux/Android) DEFAULT_OUTPUT_DIR: str = os.getenv( 'PARSER_OUTPUT_DIR', os.path.join(os.path.expanduser('~'), 'Downloads') ) # Сетевые настройки REQUEST_DELAY: float = 1.5 # Задержка между запросами (сек) REQUEST_TIMEOUT: int = 20 # Таймаут запроса (сек) MAX_RETRIES: int = 3 # Максимум повторных попыток BACKOFF_FACTOR: float = 0.5 # Коэффициент экспоненциальной задержки # Формат вывода для текстового режима TEXT_FORMAT: str = 'json' # 'json' или 'csv' # Расширения файлов IMAGE_EXTENSIONS = {'.jpg', '.jpeg', '.png', '.gif', '.bmp', '.webp', '.svg'} VIDEO_EXTENSIONS = {'.mp4', '.webm', '.ogg', '.mov', '.avi', '.mkv'} DOC_EXTENSIONS = {'.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.zip', '.rar', '.7z'} ATTACH_EXTENSIONS = IMAGE_EXTENSIONS | VIDEO_EXTENSIONS | DOC_EXTENSIONS # MIME-типы для видео VIDEO_MIME_TYPES = { '.mp4': 'video/mp4', '.webm': 'video/webm', '.ogg': 'video/ogg', '.mov': 'video/quicktime', '.avi': 'video/x-msvideo', '.mkv': 'video/x-matroska' } # CSS-селекторы для парсинга (можно адаптировать под структуру форума) SELECTORS = { 'post_blocks': ['li.ipsComment', 'div.cPost', 'article.ipsComment', 'div.post'], 'author': ['.cAuthorPane_author', '.ipsComment_author', '.author', 'a[data-username]'], 'content': ['.cPost_contentWrap', '.ipsComment_content', '.post-content', 'div[data-role="commentContent"]'], 'next_page': ['a[rel="next"]', '.ipsPagination_next a', '.next a', 'a[rel="nofollow"][data-pagetag]'] } # ========================== НАСТРОЙКА ЛОГГИРОВАНИЯ ========================== def setup_logging(log_level: int = logging.INFO, log_file: Optional[str] = None) -> logging.Logger: """Настраивает логгер с выводом в консоль и опционально в файл.""" logger = logging.getLogger('prodota_parser') logger.setLevel(log_level) # Формат сообщений formatter = logging.Formatter( '%(asctime)s [%(levelname)s] %(name)s: %(message)s', datefmt='%H:%M:%S' ) # Консольный обработчик console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logger.addHandler(console_handler) # Файловый обработчик (если указан) if log_file: file_handler = logging.FileHandler(log_file, encoding='utf-8', mode='a') file_handler.setFormatter(formatter) logger.addHandler(file_handler) return logger logger = setup_logging() # ========================== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ========================== def sanitize_filename(filename: str, max_length: int = 100) -> str: """Очищает имя файла от недопустимых символов.""" # Удаляем опасные символы для разных ОС sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename) # Обрезаем если слишком длинное if len(sanitized) > max_length: name, ext = os.path.splitext(sanitized) sanitized = name[:max_length-len(ext)] + ext return sanitized.strip() or 'unnamed' def estimate_record_size(record: Dict[str, Any]) -> int: """Приблизительно оценивает размер записи в байтах.""" return sum( len(str(v).encode('utf-8')) for v in record.values() ) + 50 # Накладные расходы на форматирование # ========================== БАЗОВЫЙ ПАРСЕР ========================== class BaseParser: """Базовый класс с общими методами для парсинга.""" def __init__( self, start_url: str, output_dir: Path, delay: float = Config.REQUEST_DELAY, timeout: int = Config.REQUEST_TIMEOUT ): self.start_url = start_url self.output_dir = output_dir self.delay = delay self.timeout = timeout # Настройка сессии с retry-логикой self.session = self._create_session() self.posts: List[Dict[str, Any]] = [] self.seen_posts: set = set() self.stats = {'pages_parsed': 0, 'posts_found': 0, 'errors': 0} def _create_session(self) -> requests.Session: """Создаёт сессию с настройками retry и заголовками.""" session = requests.Session() # Стратегия повторных запросов retry_strategy = Retry( total=Config.MAX_RETRIES, backoff_factor=Config.BACKOFF_FACTOR, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=['GET', 'HEAD'] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount('https://', adapter) session.mount('http://', adapter) # Заголовки для имитации браузера session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'Accept-Language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7', 'Accept-Encoding': 'gzip, deflate, br', 'Connection': 'keep-alive', 'Upgrade-Insecure-Requests': '1', 'Sec-Fetch-Dest': 'document', 'Sec-Fetch-Mode': 'navigate', 'Sec-Fetch-Site': 'none', }) return session def get_soup(self, url: str) -> Optional[BeautifulSoup]: """Загружает и парсит HTML-страницу.""" try: logger.debug(f"Загрузка страницы: {url}") response = self.session.get(url, timeout=self.timeout) response.raise_for_status() # Используем content (bytes) для корректного определения кодировки soup = BeautifulSoup(response.content, 'html.parser') # Проверка на блокировку или капчу if self._is_blocked(soup, response): logger.warning(f"Возможная блокировка на {url}") return None return soup except requests.exceptions.Timeout: logger.error(f"Таймаут при загрузке {url}") except requests.exceptions.ConnectionError: logger.error(f"Ошибка соединения при загрузке {url}") except requests.exceptions.HTTPError as e: logger.error(f"HTTP ошибка {e.response.status_code} для {url}") except Exception as e: logger.error(f"Неожиданная ошибка при загрузке {url}: {type(e).__name__}: {e}") self.stats['errors'] += 1 return None def _is_blocked(self, soup: BeautifulSoup, response: requests.Response) -> bool: """Проверяет признаки блокировки или капчи.""" if response.status_code != 200: return True # Проверка на типичные признаки капчи block_indicators = [ 'captcha', 'access denied', 'forbidden', '403', 'cloudflare', 'checking your browser', 'ddos-guard' ] page_text = soup.get_text().lower() return any(indicator in page_text for indicator in block_indicators) def extract_post_id(self, post_element: Tag) -> Optional[str]: """Извлекает уникальный идентификатор поста.""" # Пробуем разные атрибуты if post_element.get('id'): match = re.search(r'Comment-(\d+)|Post-(\d+)|comment_(\d+)', post_element['id'], re.I) if match: return next(g for g in match.groups() if g) if post_element.get('data-commentid'): return str(post_element['data-commentid']) if post_element.get('data-postid'): return str(post_element['data-postid']) # Пробуем найти ссылку на пост link = post_element.select_one('a[href*="#comment-"], a[href*="#post-"]') if link and link.get('href'): match = re.search(r'#(?:comment|post)-?(\d+)', link['href'], re.I) if match: return match.group(1) return None def extract_author(self, post_element: Tag) -> Optional[str]: """Извлекает имя автора поста.""" for selector in Config.SELECTORS['author']: author_elem = post_element.select_one(selector) if author_elem: text = author_elem.get_text(strip=True) if text: return text return None def extract_content(self, post_element: Tag) -> Optional[str]: """Извлекает текстовое содержимое поста.""" for selector in Config.SELECTORS['content']: content_elem = post_element.select_one(selector) if content_elem: return content_elem.get_text(separator='\n', strip=True) return None def parse_page(self, url: str) -> Optional[str]: """Парсит одну страницу и извлекает посты. Возвращает URL следующей страницы.""" soup = self.get_soup(url) if not soup: return None # Поиск блоков постов post_blocks = [] for selector in Config.SELECTORS['post_blocks']: blocks = soup.select(selector) if blocks: post_blocks.extend(blocks) break if not post_blocks: logger.warning(f"Не найдено постов на странице {url}. Возможно, изменилась структура сайта.") return None # Обработка каждого поста for block in post_blocks: post_id = self.extract_post_id(block) if not post_id or post_id in self.seen_posts: continue author = self.extract_author(block) content = self.extract_content(block) if not author or not content: logger.debug(f"Пропущен пост {post_id}: не удалось извлечь автора или контент") continue post_data = { 'author': author, 'post_number': post_id, 'content': content, 'url': f"{url.split('?')[0]}#comment-{post_id}" if url else None } self.seen_posts.add(post_id) self.posts.append(post_data) self.stats['posts_found'] += 1 self.stats['pages_parsed'] += 1 logger.info(f"Страница {self.stats['pages_parsed']}: найдено {len(post_blocks)} блоков, всего постов: {len(self.posts)}") # Поиск следующей страницы for selector in Config.SELECTORS['next_page']: next_elem = soup.select_one(selector) if next_elem and next_elem.get('href'): next_url = urljoin(url, next_elem['href']) if next_url != url: return next_url return None def run(self) -> bool: """Основной цикл парсинга.""" logger.info(f"Начинаем парсинг: {self.start_url}") logger.info(f"Директория сохранения: {self.output_dir.absolute()}") current_url = self.start_url page_num = 1 while current_url: logger.info(f"→ Страница {page_num}: {current_url}") next_url = self.parse_page(current_url) if next_url and next_url != current_url: current_url = next_url page_num += 1 time.sleep(self.delay) else: logger.info("Достигнута последняя страница или ошибка навигации") break logger.info(f"\n✅ Парсинг завершён. Страниц: {self.stats['pages_parsed']}, Постов: {len(self.posts)}, Ошибок: {self.stats['errors']}") return len(self.posts) > 0 # ========================== ТЕКСТОВЫЙ ПАРСЕР ========================== class TextParser(BaseParser): """Парсер для сохранения только текстового контента.""" def __init__( self, start_url: str, output_dir: Path, format: str = 'json', split_size_mb: float = 0 ): super().__init__(start_url, output_dir) self.format = format.lower() self.split_size_mb = split_size_mb def _save_json_streaming(self, data: List[Dict], filepath: Path): """Сохраняет данные в JSON с потоковой записью для экономии памяти.""" with open(filepath, 'w', encoding='utf-8') as f: f.write('[\n') for i, record in enumerate(data): json.dump(record, f, ensure_ascii=False, indent=2) if i < len(data) - 1: f.write(',\n') else: f.write('\n') f.write(']') logger.info(f"Данные сохранены в {filepath}") def _save_csv(self, data: List[Dict], filepath: Path): """Сохраняет данные в CSV с корректной кодировкой.""" if not data: logger.warning("Нет данных для сохранения в CSV") return with open(filepath, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.DictWriter(f, fieldnames=['author', 'post_number', 'content', 'url'], extrasaction='ignore') writer.writeheader() writer.writerows(data) logger.info(f"Данные сохранены в {filepath}") def _split_data_by_size(self, data: List[Dict], max_size_bytes: int) -> Generator[List[Dict], None, None]: """Генератор для разбивки данных на части заданного размера.""" if not data: return current_batch = [] current_size = 0 for record in data: record_size = estimate_record_size(record) # Если добавление записи превысит лимит и батч не пустой — выдаём текущий батч if current_size + record_size > max_size_bytes and current_batch: yield current_batch current_batch = [record] current_size = record_size else: current_batch.append(record) current_size += record_size # Выдаём последний батч if current_batch: yield current_batch def save(self): """Сохраняет данные с учётом формата и разбивки.""" if not self.posts: logger.warning("Нет постов для сохранения") return max_size_bytes = self.split_size_mb * 1024 * 1024 if self.split_size_mb > 0 else float('inf') if self.format == 'json': if self.split_size_mb > 0: logger.info(f"Разбивка JSON на части по {self.split_size_mb} МБ") for part_num, batch in enumerate(self._split_data_by_size(self.posts, max_size_bytes), 1): filepath = self.output_dir / f'posts_part{part_num:03d}.json' self._save_json_streaming(batch, filepath) else: filepath = self.output_dir / 'posts.json' self._save_json_streaming(self.posts, filepath) elif self.format == 'csv': if self.split_size_mb > 0: logger.info(f"Разбивка CSV на части по {self.split_size_mb} МБ") for part_num, batch in enumerate(self._split_data_by_size(self.posts, max_size_bytes), 1): filepath = self.output_dir / f'posts_part{part_num:03d}.csv' self._save_csv(batch, filepath) else: filepath = self.output_dir / 'posts.csv' self._save_csv(self.posts, filepath) else: logger.error(f"Неподдерживаемый формат: {self.format}") def run(self) -> bool: """Запускает парсинг и сохранение.""" if super().run(): self.save() return True return False # ========================== ПОЛНЫЙ АРХИВАТОР ========================== class FullArchiver(BaseParser): """Парсер с загрузкой вложений и созданием HTML-архива.""" def __init__(self, start_url: str, output_dir: Path): super().__init__(start_url, output_dir) # Структура папок для вложений self.attachments_dir = self.output_dir / 'attachments' self.media_dirs = { 'images': self.attachments_dir / 'images', 'videos': self.attachments_dir / 'videos', 'docs': self.attachments_dir / 'docs' } for dir_path in [self.attachments_dir, *self.media_dirs.values()]: dir_path.mkdir(parents=True, exist_ok=True) self.downloaded_files: Dict[str, str] = {} # URL -> локальный путь self.attachment_counter = 0 def _get_category(self, url: str) -> str: """Определяет категорию вложения по расширению.""" ext = os.path.splitext(urlparse(url).path)[1].lower() if ext in Config.IMAGE_EXTENSIONS: return 'images' elif ext in Config.VIDEO_EXTENSIONS: return 'videos' else: return 'docs' def _get_filename(self, url: str) -> str: """Извлекает или генерирует имя файла из URL.""" parsed = urlparse(url) filename = os.path.basename(unquote(parsed.path)) # Удаляем параметры запроса filename = filename.split('?')[0].split('#')[0] # Если имя пустое или без расширения — генерируем if not filename or '.' not in filename: ext = self._guess_extension_from_url(url) self.attachment_counter += 1 filename = f"file_{self.attachment_counter:05d}{ext}" return sanitize_filename(filename) def _guess_extension_from_url(self, url: str) -> str: """Пытается определить расширение файла из URL или заголовков.""" # Пробуем из пути URL ext = os.path.splitext(urlparse(url).path)[1].lower() if ext in Config.ATTACH_EXTENSIONS: return ext # Пробуем HEAD-запрос try: response = self.session.head(url, timeout=5, allow_redirects=True) content_type = response.headers.get('content-type', '').lower() if 'image/jpeg' in content_type: return '.jpg' elif 'image/png' in content_type: return '.png' elif 'image/gif' in content_type: return '.gif' elif 'video/mp4' in content_type: return '.mp4' elif 'application/pdf' in content_type: return '.pdf' except: pass return '.bin' def _get_video_mime_type(self, filename: str) -> str: """Возвращает MIME-type для видеофайла по расширению.""" ext = os.path.splitext(filename)[1].lower() return Config.VIDEO_MIME_TYPES.get(ext, 'video/mp4') def download_file(self, url: str, category: str) -> Optional[str]: """Скачивает файл и возвращает относительный путь.""" if url in self.downloaded_files: return self.downloaded_files[url] filename = self._get_filename(url) subdir = self.media_dirs.get(category, self.media_dirs['docs']) local_path = subdir / filename # Обработка коллизий имён counter = 1 while local_path.exists(): name, ext = os.path.splitext(filename) filename = f"{name}_{counter}{ext}" local_path = subdir / filename counter += 1 try: logger.debug(f"Скачивание: {url} → {local_path.name}") with self.session.get(url, stream=True, timeout=Config.REQUEST_TIMEOUT) as response: response.raise_for_status() with open(local_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) # Сохраняем относительный путь для HTML rel_path = local_path.relative_to(self.output_dir).as_posix() self.downloaded_files[url] = rel_path logger.info(f"✓ Скачан: {rel_path}") return rel_path except Exception as e: logger.error(f"✗ Ошибка скачивания {url}: {type(e).__name__}: {e}") return None def _is_attachment_url(self, url: str) -> bool: """Определяет, является ли ссылка вложением.""" if not url or url.startswith(('#', 'javascript:', 'mailto:')): return False parsed = urlparse(url) path_lower = parsed.path.lower() # Проверка по расширению ext = os.path.splitext(path_lower)[1] if ext in Config.ATTACH_EXTENSIONS: return True # Проверка по типичным путям загрузок attachment_patterns = ['/uploads/', '/monthly_', '/attachments/', '/public/'] if any(pattern in path_lower for pattern in attachment_patterns): return True return False def process_html_content(self, html_content: str, base_url: str) -> str: """Обрабатывает HTML: заменяет внешние ссылки на вложения локальными.""" if not html_content: return html_content soup = BeautifulSoup(html_content, 'html.parser') # Обработка ссылок <a> с вложениями for link in soup.find_all('a', href=True): href = link['href'] absolute_url = urljoin(base_url, href) if not self._is_attachment_url(absolute_url): continue category = self._get_category(absolute_url) local_path = self.download_file(absolute_url, category) if not local_path: continue # Заменяем ссылку и добавляем превью для медиа ext = os.path.splitext(local_path)[1].lower() if ext in Config.IMAGE_EXTENSIONS: # Для изображений заменяем ссылку на <img> img = soup.new_tag('img', src=local_path, alt=link.get_text(strip=True), style='max-width:100%;height:auto;display:block;margin:5px 0;') link.replace_with(img) elif ext in Config.VIDEO_EXTENSIONS: # Для видео создаём <video> с правильным MIME-type video = soup.new_tag('video', controls=True, style='max-width:100%;height:auto;display:block;margin:5px 0;') source = soup.new_tag('source', src=local_path, type=self._get_video_mime_type(local_path)) video.append(source) link.replace_with(video) else: # Для документов просто обновляем ссылку link['href'] = local_path link['target'] = '_blank' link['download'] = '' # Обработка прямых изображений <img> for img in soup.find_all('img', src=True): src = img['src'] absolute_url = urljoin(base_url, src) if self._is_attachment_url(absolute_url): local_path = self.download_file(absolute_url, 'images') if local_path: img['src'] = local_path # Удаляем lazy-loading атрибуты, которые могут мешать локальному отображению img.pop('data-src', None) img.pop('loading', None) return str(soup) def extract_post_data(self, post_element: Tag, base_url: str) -> Optional[Dict[str, Any]]: """Переопределённый метод для извлечения поста с обработкой вложений.""" author = self.extract_author(post_element) post_id = self.extract_post_id(post_element) if not author or not post_id: return None # Ищем контент content_elem = None for selector in Config.SELECTORS['content']: content_elem = post_element.select_one(selector) if content_elem: break if not content_elem: return None # Обрабатываем вложения в HTML inner_html = str(content_elem) processed_html = self.process_html_content(inner_html, base_url) return { 'author': author, 'post_number': post_id, 'html': processed_html, 'url': f"{base_url.split('?')[0]}#comment-{post_id}" } def generate_html(self) -> str: """Генерирует финальную HTML-страницу архива.""" template = """<!DOCTYPE html> <html lang="ru"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>{title}</title> <style> :root {{ --bg-primary: #f5f7fa; --bg-post: #ffffff; --text-primary: #2c3e50; --text-secondary: #7f8c8d; --border-color: #e0e6ed; --accent: #3498db; }} * {{ box-sizing: border-box; margin: 0; padding: 0; }} body {{ font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif; background: var(--bg-primary); color: var(--text-primary); line-height: 1.6; padding: 20px; }} .container {{ max-width: 1000px; margin: 0 auto; }} header {{ background: var(--bg-post); padding: 20px; border-radius: 8px; margin-bottom: 20px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); text-align: center; }} header h1 {{ font-size: 1.5rem; margin-bottom: 8px; }} header p {{ color: var(--text-secondary); font-size: 0.9rem; }} .post {{ background: var(--bg-post); border: 1px solid var(--border-color); border-radius: 8px; padding: 20px; margin-bottom: 16px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); }} .post-header {{ display: flex; justify-content: space-between; align-items: center; padding-bottom: 12px; border-bottom: 1px solid var(--border-color); margin-bottom: 12px; flex-wrap: wrap; gap: 8px; }} .post-author {{ font-weight: 600; color: var(--accent); }} .post-meta {{ font-size: 0.85rem; color: var(--text-secondary); }} .post-content {{ font-size: 1rem; overflow-wrap: break-word; }} .post-content img, .post-content video {{ max-width: 100%; height: auto; border-radius: 4px; margin: 10px 0; }} .post-content a {{ color: var(--accent); text-decoration: none; }} .post-content a:hover {{ text-decoration: underline; }} .post-content pre {{ background: #f8f9fa; padding: 12px; border-radius: 4px; overflow-x: auto; margin: 10px 0; }} .post-content code {{ background: #f1f3f5; padding: 2px 6px; border-radius: 3px; font-family: monospace; }} footer {{ text-align: center; padding: 20px; color: var(--text-secondary); font-size: 0.85rem; }} @media (max-width: 768px) {{ body {{ padding: 12px; }} .post {{ padding: 16px; }} .post-header {{ flex-direction: column; align-items: flex-start; }} }} </style> </head> <body> <div class="container"> <header> <h1>📦 Архив темы</h1> <p>Источник: {source_url}<br>Всего сообщений: {total_posts}<br>Дата создания: {generated_date}</p> </header> {posts_html} <footer> <p>Создано парсером prodota.ru • {generated_date}</p> </footer> </div> </body> </html>""" from datetime import datetime posts_html = [] for post in self.posts: post_html = f""" <article class="post" id="post-{post['post_number']}"> <div class="post-header"> <span class="post-author">{post['author']}</span> <span class="post-meta"> Пост #{post['post_number']} {f'• <a href="{post["url"]}">🔗 Оригинал</a>' if post.get('url') else ''} </span> </div> <div class="post-content"> {post['html']} </div> </article> """ posts_html.append(post_html) return template.format( title=f"Архив: {os.path.basename(self.output_dir)}", source_url=self.start_url, total_posts=len(self.posts), generated_date=datetime.now().strftime('%d.%m.%Y %H:%M'), posts_html='\n'.join(posts_html) ) def run(self) -> bool: """Запускает полный парсинг с загрузкой вложений.""" if not super().run(): return False logger.info("🔄 Генерация HTML-архива...") try: html_content = self.generate_html() index_path = self.output_dir / 'index.html' with open(index_path, 'w', encoding='utf-8') as f: f.write(html_content) logger.info(f"✅ HTML-архив сохранён: {index_path.absolute()}") # Статистика по вложениям if self.downloaded_files: stats = {} for path in self.downloaded_files.values(): category = os.path.basename(os.path.dirname(path)) stats[category] = stats.get(category, 0) + 1 logger.info(f"📎 Скачано вложений: {len(self.downloaded_files)}") for cat, count in stats.items(): logger.info(f" • {cat}: {count}") return True except Exception as e: logger.error(f"Ошибка при генерации HTML: {type(e).__name__}: {e}") return False # ========================== УТИЛИТЫ ========================== def validate_output_dir(base_dir: str, folder_name: str) -> Path: """Проверяет и создаёт директорию для вывода.""" try: output_path = Path(base_dir) / sanitize_filename(folder_name) output_path.mkdir(parents=True, exist_ok=True) # Проверка на запись test_file = output_path / '.permission_test' test_file.touch() test_file.unlink() logger.info(f"Директория готова: {output_path.absolute()}") return output_path except PermissionError: logger.error(f"Нет прав на запись в {base_dir}") # Пробуем текущую директорию fallback = Path.cwd() / sanitize_filename(folder_name) fallback.mkdir(parents=True, exist_ok=True) logger.warning(f"Используем резервную директорию: {fallback.absolute()}") return fallback except Exception as e: logger.error(f"Ошибка при создании директории: {e}") sys.exit(1) def get_user_mode() -> int: """Запрашивает у пользователя режим работы.""" print("\n" + "="*50) print("🔧 ПАРСЕР FORUM.PRODOTA.RU") print("="*50) print("\nВыберите режим:") print(" 1 — Только текст (JSON/CSV)") print(" 2 — Полный архив (с вложениями и HTML)") while True: choice = input("\nВведите 1 или 2: ").strip() if choice in ('1', '2'): return int(choice) print("❌ Пожалуйста, введите 1 или 2") def get_split_size() -> float: """Запрашивает размер части для разбивки.""" print("\n📦 Разбивка результата на файлы:") print(" Введите максимальный размер части в МБ") print(" (0 — без разбивки, рекомендуется 10-50)") while True: try: value = float(input("Размер в МБ: ").strip()) return max(0, value) except ValueError: print("❌ Введите числовое значение") # ========================== ТОЧКА ВХОДА ========================== def main(): """Основная функция запуска.""" # Определяем режим mode = Config.DEFAULT_MODE if mode is None: mode = get_user_mode() else: logger.info(f"Используем режим {mode} из конфигурации") # Валидация конфигурации if not Config.START_URL or Config.START_URL.startswith('https://prodota.ru/forum/topic/XXXX'): logger.error("❌ Укажите корректный START_URL в конфигурации!") sys.exit(1) # Подготовка директории output_dir = validate_output_dir(Config.DEFAULT_OUTPUT_DIR, Config.FOLDER_NAME) # Запуск в зависимости от режима try: if mode == 1: # Текстовый режим split_mb = get_split_size() if Config.split_size_mb is None else Config.split_size_mb parser = TextParser( start_url=Config.START_URL, output_dir=output_dir, format=Config.TEXT_FORMAT, split_size_mb=split_mb ) success = parser.run() else: # Полный режим print(f"\n📥 Режим: полный архив с вложениями") print(f" Изображения: {len(Config.IMAGE_EXTENSIONS)} форматов") print(f" Видео: {len(Config.VIDEO_EXTENSIONS)} форматов") print(f" Документы: {len(Config.DOC_EXTENSIONS)} форматов") archiver = FullArchiver( start_url=Config.START_URL, output_dir=output_dir ) success = archiver.run() # Финальный отчёт print("\n" + "="*50) if success: print("✅ РАБОТА ЗАВЕРШЕНА УСПЕШНО") print(f"📁 Файлы сохранены в: {output_dir.absolute()}") else: print("⚠️ РАБОТА ЗАВЕРШЕНА С ПРЕДУПРЕЖДЕНИЯМИ") print(" Проверьте логи выше для деталей") print("="*50) except KeyboardInterrupt: logger.warning("\n⚠️ Прервано пользователем") sys.exit(130) except Exception as e: logger.exception(f"❌ Критическая ошибка: {type(e).__name__}: {e}") sys.exit(1) if __name__ == '__main__': main() Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение
fitonyash #1948 3 часа назад еблан дебил Соню прогони через него Qreeq понравилось это Цитата Поделиться сообщением Ссылка на сообщение
yellyex #1949 3 часа назад fitonyash написал 13 минут назад: еблан дебил Соню прогони через него Качаешь python последней версии. Устанавливаешь все библиотеки. Если не знаешь какие, то скопируй код в нейронку бесплатную квег или дип сик, и узнай как установить. Затем пользуйся сам на здоровье. И последний код с ошибками. Сейчас веду исправление. И уточни, что за Соня и в какой теме. Пожалуйста. Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение
yellyex #1950 1 час назад Новый исправленный парсер: Скрытый текст # -*- coding: utf-8 -*- """ ================================================================================ ПАРСЕР ФОРУМА PRODOTA.RU (версия 4.3 — исправленная очистка) ================================================================================ 📋 ОПИСАНИЕ: Программа скачивает сообщения с указанной темы форума prodota.ru и сохраняет их в удобном формате. Поддерживает два режима работы. 🔹 РЕЖИМ 1 — Только текст (рекомендуется): • Сохраняет: автор, номер поста, время, текст сообщения • Формат: JSON или CSV (на выбор) • Опция: разбивка на файлы заданного размера (в МБ) • Очистка: удаляет подписи, лайки, кнопки интерфейса, метаданные • Цитаты: сохраняет структуру "цитата + ответ автора" 🔹 РЕЖИМ 2 — Полный архив: • Всё из режима 1 + скачивание вложений (картинки, видео, документы) • Создание локальной HTML-страницы для офлайн-просмотра • Вложения сохраняются в подпапки: attachments/images, videos, docs 📁 ВЫХОДНЫЕ ДАННЫЕ (режим 1): Каждая запись содержит ТОЛЬКО 4 поля: { "author": "никнейм_автора", "post_number": "ID_поста", "post_time": "время_публикации", "post_text": "очищенный_текст_сообщения" } ================================================================================ 🔧 НАСТРОЙКА ПРОГРАММЫ (редактируйте этот раздел перед запуском) ================================================================================ """ # ========================== ИМПОРТЫ БИБЛИОТЕК ========================== import os import re import sys import json import csv import time import logging from pathlib import Path from urllib.parse import urljoin from typing import Optional, List, Dict, Any, Generator import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from bs4 import BeautifulSoup, Tag # Цветной вывод в консоль (кросс-платформенно) try: from colorama import init, Fore, Back, Style init(autoreset=True) except ImportError: # Если colorama не установлена, создаём заглушки class Fore: RED = GREEN = YELLOW = CYAN = MAGENTA = WHITE = RESET = '' class Style: BRIGHT = RESET_ALL = '' class Back: RESET = '' # ========================== НАСТРОЙКИ ПОЛЬЗОВАТЕЛЯ ========================== # ▼▼▼ ИЗМЕНЯЙТЕ ЗНАЧЕНИЯ НИЖЕ ПОД СВОИ ЗАДАЧИ ▼▼▼ class Config: """ 🔹 ВСЕ НАСТРОЙКИ ПРОГРАММЫ НАХОДЯТСЯ ЗДЕСЬ 🔹 Как менять: 1. Найдите нужную строку ниже 2. Замените значение в кавычках или число на своё 3. Сохраните файл и запустите парсер Пример: START_URL: str = 'https://prodota.ru/forum/topic/123456/' ↑↑↑ вставьте сюда ссылку на вашу тему """ # ───────────────────────────────────────────────────────────── # 🌐 ОБЯЗАТЕЛЬНЫЕ НАСТРОЙКИ # ───────────────────────────────────────────────────────────── # Режим работы по умолчанию: # 1 = только текст (быстро, мало места) # 2 = полный архив с вложениями (медленнее, нужно место) # None = программа спросит режим при запуске DEFAULT_MODE: Optional[int] = None # 🔗 ССЫЛКА НА ПЕРВУЮ СТРАНИЦУ ТЕМЫ (ОБЯЗАТЕЛЬНО!) # Вставьте полную ссылку на тему, которую нужно скачать: START_URL: str = 'https://prodota.ru/forum/topic/224233/' # 📁 ИМЯ ПАПКИ ДЛЯ СОХРАНЕНИЯ РЕЗУЛЬТАТА # Используйте латиницу, цифры, подчёркивания (без пробелов и кириллицы): FOLDER_NAME: str = 'prodota_archive' # 📂 БАЗОВАЯ ПАПКА ДЛЯ СОХРАНЕНИЯ # На Android автоматически определяется папка Download. # На других ОС — папка "Загрузки" пользователя. @staticmethod def _default_output_dir() -> str: # Проверяем, запущено ли на Android (по наличию характерной папки или переменной) if os.path.exists('/storage/emulated/0/Download'): return '/storage/emulated/0/Download' return os.path.join(os.path.expanduser('~'), 'Downloads') DEFAULT_OUTPUT_DIR: str = os.getenv('PARSER_OUTPUT_DIR', _default_output_dir()) # ───────────────────────────────────────────────────────────── # ⚙️ ДОПОЛНИТЕЛЬНЫЕ НАСТРОЙКИ (можно не менять) # ───────────────────────────────────────────────────────────── # Задержка между запросами к сайту (в секундах) REQUEST_DELAY: float = 1.0 # Таймаут ожидания ответа от сайта (в секундах) REQUEST_TIMEOUT: int = 20 # Количество повторных попыток при ошибке соединения MAX_RETRIES: int = 3 # Множитель экспоненциальной задержки между повторными попытками BACKOFF_FACTOR: float = 1.0 # Формат сохранения для режима 1: 'json' или 'csv' TEXT_FORMAT: str = 'json' # Максимальный размер одного файла при разбивке (в МБ), 0 = не разбивать SPLIT_SIZE_MB: float = 0 # CSS-селекторы для поиска элементов на странице SELECTORS = { 'post_blocks': ['li.ipsComment', 'div.cPost', 'article.ipsComment', 'div.post'], 'author': ['.cAuthorPane_author', '.ipsComment_author', '.author', 'a[data-username]'], 'content': ['.cPost_contentWrap', '.ipsComment_content', '.post-content', '[data-role="commentContent"]'], 'next_page': ['a[rel="next"]', '.ipsPagination_next a', '.next a'], 'last_page': ['a.ipsPagination_last', 'a[data-page="last"]', 'li.ipsPagination_last a'] } # ========================== НАСТРОЙКА ЛОГГИРОВАНИЯ С ЦВЕТАМИ ========================== class ColoredFormatter(logging.Formatter): """Форматтер, окрашивающий уровень логирования в цвет.""" def format(self, record): level_color = { 'INFO': Fore.CYAN, 'WARNING': Fore.YELLOW, 'ERROR': Fore.RED, 'CRITICAL': Fore.RED + Back.WHITE, }.get(record.levelname, Fore.WHITE) original_levelname = record.levelname record.levelname = f"{level_color}{original_levelname}{Style.RESET_ALL}" result = super().format(record) record.levelname = original_levelname return result def setup_logging(level: int = logging.INFO) -> logging.Logger: logger = logging.getLogger('prodota_parser') logger.setLevel(level) if not logger.handlers: formatter = ColoredFormatter('%(asctime)s [%(levelname)s] %(message)s', datefmt='%H:%M:%S') console_handler = logging.StreamHandler(sys.stdout) console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger logger = setup_logging() # ========================== ФУНКЦИЯ ОЧИСТКИ КОНТЕНТА ========================== # Удаляет из текста поста всё лишнее: подписи, лайки, кнопки, метаданные # Сохраняет: время публикации, цитаты + ответ автора, основной текст def clean_post_content(raw_content: str) -> Dict[str, str]: """ Очищает сырой текст поста от интерфейсного мусора. Что удаляется: • Номер поста (#1, #22 и т.д.) • Метки времени в начале текста (дубликат, т.к. время извлекается отдельно) • Блоки лайков ("понравилось это" и имена пользователей) • Кнопки интерфейса ("Цитата", "Поделиться сообщением", "Ссылка на сообщение") • Текст ПОСЛЕ кнопок интерфейса (автоматические подписи пользователей) Что сохраняется: • Время публикации (извлекается в отдельное поле) • Цитаты других постов (структура: "User написал... : текст") • Ответ автора на цитату Возвращает: {'post_time': '...', 'post_text': '...'} """ if not raw_content: return {'post_time': '', 'post_text': ''} text = raw_content.strip() post_time = '' # 1. Извлекаем время публикации time_match = re.search( r'(?:Опубликовано:\s*)?' r'(вчера|сегодня|\d+\s*(?:часов?|минут?|секунд?|дней?|месяцев?|лет?)\s*назад|' r'\d{2}\.\d{2}\.\d{4}\s+в\s+\d{2}:\d{2})', text, re.IGNORECASE ) if time_match: post_time = time_match.group(1) if time_match.group(1) else time_match.group(0).replace('Опубликовано:', '').strip() text = text.replace(time_match.group(0), '', 1).strip() # 2. Удаляем номер поста в начале (#1, #22...) text = re.sub(r'^#\d+\s*', '', text).strip() # 3. Удаляем блоки лайков (безопасная версия, не вызывающая ошибок) # Удаляем строки, содержащие "понравилось это", а также предыдущие строки с именами lines = text.split('\n') cleaned_lines = [] skip_next = False for i, line in enumerate(lines): if 'понравилось это' in line.lower(): # Пропускаем эту строку и, возможно, предыдущие строки-имена # Удаляем также предыдущие строки, если они содержат только имена и запятые if cleaned_lines and re.match(r'^[\s\w,]+$', cleaned_lines[-1]): cleaned_lines.pop() continue # Удаляем строки, которые целиком состоят из имён с запятыми (без "понравилось") if re.match(r'^[\s\w,]+$', line) and len(line) < 100: continue cleaned_lines.append(line) text = '\n'.join(cleaned_lines) # 4. Отсекаем всё, что идёт после кнопок интерфейса (это подписи) for marker in ['Ссылка на сообщение', 'Поделиться сообщением', 'Цитата']: idx = text.find(marker) if idx != -1: text = text[:idx] break # 5. Убираем лишние пустые строки text = re.sub(r'\n{3,}', '\n\n', text).strip() return {'post_time': post_time, 'post_text': text} # ========================== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ========================== def sanitize_filename(filename: str, max_length: int = 100) -> str: """ Очищает имя файла от символов, недопустимых в разных ОС. """ sanitized = re.sub(r'[<>:"/\\|?*\x00-\x1f]', '_', filename) if len(sanitized) > max_length: name, ext = os.path.splitext(sanitized) sanitized = name[:max_length-len(ext)] + ext return sanitized.strip() or 'unnamed' def estimate_record_size(record: Dict[str, Any]) -> int: """ Приблизительно оценивает размер одной записи в байтах. Нужно для разбивки больших файлов на части. """ return sum(len(str(v).encode('utf-8')) for v in record.values()) + 50 # ========================== БАЗОВЫЙ КЛАСС ПАРСЕРА ========================== class BaseParser: """ Базовый класс с общими методами для парсинга страниц форума. """ def __init__(self, start_url: str, output_dir: Path, delay: float = Config.REQUEST_DELAY, timeout: int = Config.REQUEST_TIMEOUT): self.start_url = start_url self.output_dir = output_dir self.delay = delay self.timeout = timeout self.session = self._create_session() self.posts: List[Dict[str, Any]] = [] self.seen_posts: set = set() self.stats = {'pages_parsed': 0, 'posts_found': 0, 'errors': 0} self.total_pages: Optional[int] = None # общее количество страниц (если удастся определить) self.start_time: float = 0.0 def _create_session(self) -> requests.Session: """ Создаёт сессию с настройками повторных запросов и заголовками браузера. """ session = requests.Session() # Стратегия повтора при ошибках сети retry_strategy = Retry( total=Config.MAX_RETRIES, backoff_factor=Config.BACKOFF_FACTOR, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=['GET', 'HEAD'] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount('https://', adapter) session.mount('http://', adapter) # Заголовки для имитации обычного браузера session.headers.update({ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'ru-RU,ru;q=0.9,en;q=0.8' }) return session def get_soup(self, url: str) -> Optional[BeautifulSoup]: """ Загружает HTML-страницу и возвращает объект BeautifulSoup для парсинга. """ try: response = self.session.get(url, timeout=self.timeout) response.raise_for_status() soup = BeautifulSoup(response.content, 'html.parser') # Проверка на блокировку (капча, 403 и т.п.) if self._is_blocked(soup, response): logger.warning(f"Возможная блокировка на {url}") return None return soup except requests.exceptions.Timeout: logger.error(f"Таймаут при загрузке {url}") except requests.exceptions.ConnectionError: logger.error(f"Ошибка соединения при загрузке {url}") except requests.exceptions.HTTPError as e: logger.error(f"HTTP ошибка {e.response.status_code} для {url}") except Exception as e: logger.error(f"Неожиданная ошибка при загрузке {url}: {type(e).__name__}: {e}") self.stats['errors'] += 1 return None def _is_blocked(self, soup: BeautifulSoup, response: requests.Response) -> bool: """ Проверяет, не заблокировал ли сайт запрос (капча, доступ запрещён). """ if response.status_code != 200: return True block_indicators = ['captcha', 'access denied', 'forbidden', '403', 'cloudflare', 'checking your browser'] page_text = soup.get_text().lower() return any(indicator in page_text for indicator in block_indicators) def extract_post_id(self, post_element: Tag) -> Optional[str]: """ Извлекает уникальный номер поста из HTML-элемента. """ if post_element.get('id'): match = re.search(r'Comment-(\d+)|Post-(\d+)|comment_(\d+)', post_element['id'], re.I) if match: return next(g for g in match.groups() if g) if post_element.get('data-commentid'): return str(post_element['data-commentid']) link = post_element.select_one('a[href*="#comment-"], a[href*="#post-"]') if link and link.get('href'): match = re.search(r'#(?:comment|post)-?(\d+)', link['href'], re.I) if match: return match.group(1) return None def extract_author(self, post_element: Tag) -> Optional[str]: """ Извлекает никнейм автора поста. """ for selector in Config.SELECTORS['author']: author_elem = post_element.select_one(selector) if author_elem: text = author_elem.get_text(strip=True) if text: return text return None def extract_raw_content(self, post_element: Tag) -> Optional[str]: """ Извлекает сырой текст поста (до очистки). """ for selector in Config.SELECTORS['content']: content_elem = post_element.select_one(selector) if content_elem: return content_elem.get_text(separator='\n', strip=True) return None def get_total_pages(self, soup: BeautifulSoup, base_url: str) -> Optional[int]: """ Пытается извлечь общее количество страниц из пагинации. """ for selector in Config.SELECTORS['last_page']: last_elem = soup.select_one(selector) if last_elem and last_elem.get('href'): match = re.search(r'page[=_-]?(\d+)', last_elem['href']) if match: return int(match.group(1)) # Альтернатива: ищем число среди элементов пагинации pagination_text = soup.get_text() match = re.search(r'Страница\s+\d+\s+из\s+(\d+)', pagination_text, re.IGNORECASE) if match: return int(match.group(1)) return None def parse_page(self, url: str) -> Optional[str]: """ Парсит одну страницу: извлекает посты и находит ссылку на следующую. Возвращает URL следующей страницы или None, если это последняя. """ page_start = time.time() soup = self.get_soup(url) if not soup: return None # Если общее количество страниц ещё не известно, пытаемся определить if self.total_pages is None: self.total_pages = self.get_total_pages(soup, url) if self.total_pages: logger.info(f"Общее количество страниц в теме: {self.total_pages}") # Поиск блоков с постами post_blocks = [] for selector in Config.SELECTORS['post_blocks']: blocks = soup.select(selector) if blocks: post_blocks.extend(blocks) break if not post_blocks: logger.warning(f"Не найдено постов на странице {url}.") return None posts_before = len(self.posts) # Обработка каждого поста for block in post_blocks: post_id = self.extract_post_id(block) if not post_id or post_id in self.seen_posts: continue author = self.extract_author(block) raw_content = self.extract_raw_content(block) if not author or not raw_content: logger.debug(f"Пропущен пост {post_id}: не удалось извлечь автора или контент") continue # Очистка контента и добавление в список cleaned = clean_post_content(raw_content) if not cleaned['post_text']: continue self.seen_posts.add(post_id) self.posts.append({ 'author': author, 'post_number': post_id, 'post_time': cleaned['post_time'], 'post_text': cleaned['post_text'] }) self.stats['posts_found'] += 1 posts_added = len(self.posts) - posts_before # Увеличиваем счётчик страниц self.stats['pages_parsed'] += 1 elapsed = time.time() - page_start # Красивый вывод прогресса if self.total_pages: progress = self.stats['pages_parsed'] / self.total_pages * 100 bar_length = 30 filled = int(bar_length * self.stats['pages_parsed'] // self.total_pages) bar = '█' * filled + '░' * (bar_length - filled) print(f"\r{Fore.CYAN}▶ Страница {self.stats['pages_parsed']}/{self.total_pages} [{bar}] {progress:.1f}% " f"| Постов на странице: {posts_added} | Всего: {len(self.posts)} | {elapsed:.1f}с{Style.RESET_ALL}", end='') else: print(f"\r{Fore.CYAN}▶ Страница {self.stats['pages_parsed']} (общее количество неизвестно) " f"| Постов на странице: {posts_added} | Всего: {len(self.posts)} | {elapsed:.1f}с{Style.RESET_ALL}", end='') sys.stdout.flush() # Поиск ссылки на следующую страницу for selector in Config.SELECTORS['next_page']: next_elem = soup.select_one(selector) if next_elem and next_elem.get('href'): next_url = urljoin(url, next_elem['href']) if next_url != url: return next_url return None def run(self) -> bool: """ Основной цикл парсинга: проходит по всем страницам темы. """ self.start_time = time.time() print(f"\n{Fore.GREEN}┌────────────────────────────────────────────────────────────┐{Style.RESET_ALL}") print(f"{Fore.GREEN}│{Style.RESET_ALL} {Fore.YELLOW}ПАРСЕР PRODOTA.RU{Style.RESET_ALL} {Fore.GREEN}│{Style.RESET_ALL}") print(f"{Fore.GREEN}├────────────────────────────────────────────────────────────┤{Style.RESET_ALL}") print(f"{Fore.GREEN}│{Style.RESET_ALL} URL: {self.start_url}") print(f"{Fore.GREEN}│{Style.RESET_ALL} Сохранение: {self.output_dir.absolute()}") print(f"{Fore.GREEN}└────────────────────────────────────────────────────────────┘{Style.RESET_ALL}\n") current_url = self.start_url # Сначала загрузим первую страницу, чтобы определить общее количество страниц first_soup = self.get_soup(current_url) if first_soup: self.total_pages = self.get_total_pages(first_soup, current_url) if self.total_pages: logger.info(f"Общее количество страниц в теме: {self.total_pages}") while current_url: next_url = self.parse_page(current_url) if next_url and next_url != current_url: current_url = next_url time.sleep(self.delay) else: break print() # перевод строки после прогресс-бара elapsed_total = time.time() - self.start_time logger.info(f"\n✅ Парсинг завершён. Страниц: {self.stats['pages_parsed']}, Постов: {len(self.posts)}, Ошибок: {self.stats['errors']}, Время: {elapsed_total:.1f} сек") return len(self.posts) > 0 # ========================== КЛАСС ДЛЯ ТЕКСТОВОГО РЕЖИМА ========================== class TextParser(BaseParser): """ Парсер для режима 1: сохраняет только текстовые данные в JSON или CSV. """ def __init__(self, start_url: str, output_dir: Path, format: str = 'json', split_size_mb: float = 0): super().__init__(start_url, output_dir) self.format = format.lower() self.split_size_mb = split_size_mb def _save_json_streaming(self, data: List[Dict], filepath: Path): """ Сохраняет данные в JSON с потоковой записью (экономит память). """ with open(filepath, 'w', encoding='utf-8') as f: f.write('[\n') for i, record in enumerate(data): json.dump(record, f, ensure_ascii=False, indent=2) if i < len(data) - 1: f.write(',\n') else: f.write('\n') f.write(']') logger.info(f"Данные сохранены в {filepath}") def _save_csv(self, data: List[Dict], filepath: Path): """ Сохраняет данные в CSV с разделителем точка с запятой (;). Кодировка UTF-8-SIG для корректного открытия в Excel. """ if not data: logger.warning("Нет данных для сохранения в CSV") return with open(filepath, 'w', newline='', encoding='utf-8-sig') as f: writer = csv.writer(f, delimiter=';', quoting=csv.QUOTE_ALL) writer.writerow(['author', 'post_number', 'post_time', 'post_text']) for p in data: writer.writerow([p['author'], p['post_number'], p['post_time'], p['post_text']]) logger.info(f"Данные сохранены в {filepath}") def _split_data_by_size(self, data: List[Dict], max_size_bytes: int) -> Generator[List[Dict], None, None]: """ Генератор для разбивки данных на части заданного размера (в байтах). """ if not data: return current_batch = [] current_size = 0 for record in data: record_size = estimate_record_size(record) if current_size + record_size > max_size_bytes and current_batch: yield current_batch current_batch = [record] current_size = record_size else: current_batch.append(record) current_size += record_size if current_batch: yield current_batch def save(self): """ Сохраняет собранные посты в файл(ы) с учётом формата и разбивки. """ if not self.posts: logger.warning("Нет постов для сохранения") return max_size_bytes = self.split_size_mb * 1024 * 1024 if self.split_size_mb > 0 else float('inf') if self.format == 'json': if self.split_size_mb > 0: logger.info(f"Разбивка JSON на части по {self.split_size_mb} МБ") for part_num, batch in enumerate(self._split_data_by_size(self.posts, max_size_bytes), 1): filepath = self.output_dir / f'posts_part{part_num:03d}.json' self._save_json_streaming(batch, filepath) else: filepath = self.output_dir / 'posts.json' self._save_json_streaming(self.posts, filepath) elif self.format == 'csv': if self.split_size_mb > 0: logger.info(f"Разбивка CSV на части по {self.split_size_mb} МБ") for part_num, batch in enumerate(self._split_data_by_size(self.posts, max_size_bytes), 1): filepath = self.output_dir / f'posts_part{part_num:03d}.csv' self._save_csv(batch, filepath) else: filepath = self.output_dir / 'posts.csv' self._save_csv(self.posts, filepath) else: logger.error(f"Неподдерживаемый формат: {self.format}") def run(self) -> bool: """ Запускает парсинг и затем сохраняет результат. """ if super().run(): self.save() return True return False # ========================== КЛАСС ДЛЯ ПОЛНОГО АРХИВА ========================== class FullArchiver(BaseParser): """ Парсер для режима 2: скачивает вложения и создаёт HTML-архив. (Упрощённая версия — можно доработать при необходимости) """ def __init__(self, start_url: str, output_dir: Path): super().__init__(start_url, output_dir) # Создаём папки для вложений self.attachments_dir = self.output_dir / 'attachments' self.media_dirs = { 'images': self.attachments_dir / 'images', 'videos': self.attachments_dir / 'videos', 'docs': self.attachments_dir / 'docs' } for dir_path in [self.attachments_dir, *self.media_dirs.values()]: dir_path.mkdir(parents=True, exist_ok=True) def run(self) -> bool: """ Запускает парсинг и сохраняет результат в текстовом формате. (Полная реализация скачивания вложений требует доработки) """ logger.warning("Режим полного архива: скачивание вложений временно отключено.") logger.warning("Используется текстовый парсинг. Для полной версии обратитесь к разработчику.") return super().run() # ========================== ПОЛЕЗНЫЕ ФУНКЦИИ ========================== def validate_output_dir(base_dir: str, folder_name: str) -> Path: """ Проверяет и создаёт директорию для сохранения результата. Если нет прав на запись — использует текущую папку. """ try: output_path = Path(base_dir) / sanitize_filename(folder_name) output_path.mkdir(parents=True, exist_ok=True) # Проверка на возможность записи test_file = output_path / '.permission_test' test_file.touch() test_file.unlink() logger.info(f"Директория готова: {output_path.absolute()}") return output_path except PermissionError: logger.error(f"Нет прав на запись в {base_dir}") fallback = Path.cwd() / sanitize_filename(folder_name) fallback.mkdir(parents=True, exist_ok=True) logger.warning(f"Используем резервную директорию: {fallback.absolute()}") return fallback except Exception as e: logger.error(f"Ошибка при создании директории: {e}") sys.exit(1) def get_user_mode() -> int: """ Запрашивает у пользователя выбор режима работы. """ print(f"\n{Fore.GREEN}┌────────────────────────────────────────────────────────────┐{Style.RESET_ALL}") print(f"{Fore.GREEN}│{Style.RESET_ALL} {Fore.CYAN}Выберите режим работы:{Style.RESET_ALL} {Fore.GREEN}│{Style.RESET_ALL}") print(f"{Fore.GREEN}│{Style.RESET_ALL} {Fore.GREEN}[1]{Style.RESET_ALL} Только текст (быстро, мало места) {Fore.GREEN}│{Style.RESET_ALL}") print(f"{Fore.GREEN}│{Style.RESET_ALL} {Fore.GREEN}[2]{Style.RESET_ALL} Полный архив (с вложениями) {Fore.GREEN}│{Style.RESET_ALL}") print(f"{Fore.GREEN}└────────────────────────────────────────────────────────────┘{Style.RESET_ALL}") while True: choice = input(f"{Fore.YELLOW}→ Введите номер режима (1 или 2): {Style.RESET_ALL}").strip() if choice in ('1', '2'): return int(choice) print(f"{Fore.RED}❌ Пожалуйста, введите 1 или 2{Style.RESET_ALL}") def get_split_size() -> float: """ Запрашивает размер части для разбивки файлов. """ print(f"\n{Fore.CYAN}┌────────────────────────────────────────────────────────────┐{Style.RESET_ALL}") print(f"{Fore.CYAN}│ Разбивка результата на несколько файлов │{Style.RESET_ALL}") print(f"{Fore.CYAN}├────────────────────────────────────────────────────────────┤{Style.RESET_ALL}") print(f"{Fore.CYAN}│ • 0 — всё в один файл │{Style.RESET_ALL}") print(f"{Fore.CYAN}│ • 10–50 — разбивка по МБ (рекомендуется) │{Style.RESET_ALL}") print(f"{Fore.CYAN}└────────────────────────────────────────────────────────────┘{Style.RESET_ALL}") while True: try: value = float(input(f"{Fore.YELLOW}→ Максимальный размер части в МБ (0 - один файл): {Style.RESET_ALL}").strip()) return max(0, value) except ValueError: print(f"{Fore.RED}❌ Введите числовое значение (например: 0, 10, 50){Style.RESET_ALL}") # ========================== ГЛАВНАЯ ФУНКЦИЯ ЗАПУСКА ========================== def main(): """ Точка входа в программу. """ # 1. Определяем режим работы mode = Config.DEFAULT_MODE if mode is None: mode = get_user_mode() else: logger.info(f"Используем режим {mode} из конфигурации") # 2. Проверяем, что указана ссылка на тему if not Config.START_URL or Config.START_URL.endswith('topic/'): logger.error("❌ ОШИБКА: Укажите корректный START_URL в настройках!") logger.error(" Откройте файл, найдите строку:") logger.error(" START_URL: str = 'https://...'") logger.error(" и вставьте ссылку на вашу тему.") sys.exit(1) # 3. Подготавливаем папку для сохранения output_dir = validate_output_dir(Config.DEFAULT_OUTPUT_DIR, Config.FOLDER_NAME) # 4. Запускаем парсер в выбранном режиме try: if mode == 1: # ───────────────────────────────────────────── # РЕЖИМ 1: Только текст # ───────────────────────────────────────────── print(f"\n{Fore.GREEN}📝 Режим: Только текст{Style.RESET_ALL}") print(f" Формат: {Config.TEXT_FORMAT.upper()}") # Спрашиваем про разбивку, если не задано в конфиге split_mb = Config.SPLIT_SIZE_MB if split_mb is None or split_mb == 0: split_mb = get_split_size() parser = TextParser( start_url=Config.START_URL, output_dir=output_dir, format=Config.TEXT_FORMAT, split_size_mb=split_mb ) success = parser.run() else: # ───────────────────────────────────────────── # РЕЖИМ 2: Полный архив # ───────────────────────────────────────────── print(f"\n{Fore.GREEN}📦 Режим: Полный архив{Style.RESET_ALL}") print(f" {Fore.YELLOW}⚠️ Скачивание вложений временно отключено{Style.RESET_ALL}") print(f" {Fore.YELLOW}⚠️ Используется текстовый парсинг{Style.RESET_ALL}") archiver = FullArchiver( start_url=Config.START_URL, output_dir=output_dir ) success = archiver.run() # 5. Финальный отчёт print(f"\n{Fore.GREEN}┌────────────────────────────────────────────────────────────┐{Style.RESET_ALL}") if success: print(f"{Fore.GREEN}│ ✅ РАБОТА ЗАВЕРШЕНА УСПЕШНО │{Style.RESET_ALL}") else: print(f"{Fore.YELLOW}│ ⚠️ РАБОТА ЗАВЕРШЕНА С ПРЕДУПРЕЖДЕНИЯМИ │{Style.RESET_ALL}") print(f"{Fore.GREEN}├────────────────────────────────────────────────────────────┤{Style.RESET_ALL}") print(f"{Fore.GREEN}│ 📁 Файлы сохранены в: │{Style.RESET_ALL}") print(f"{Fore.GREEN}│ {output_dir.absolute()} │{Style.RESET_ALL}") print(f"{Fore.GREEN}├────────────────────────────────────────────────────────────┤{Style.RESET_ALL}") print(f"{Fore.GREEN}│ 📋 Структура результата: │{Style.RESET_ALL}") if Config.TEXT_FORMAT == 'json': print(f"{Fore.GREEN}│ • posts.json — данные в формате JSON │{Style.RESET_ALL}") else: print(f"{Fore.GREEN}│ • posts.csv — данные в формате CSV (открывается в Excel) │{Style.RESET_ALL}") if Config.SPLIT_SIZE_MB > 0: print(f"{Fore.GREEN}│ • posts_part001.json, posts_part002.json... │{Style.RESET_ALL}") print(f"{Fore.GREEN}└────────────────────────────────────────────────────────────┘{Style.RESET_ALL}") except KeyboardInterrupt: logger.warning(f"\n{Fore.YELLOW}⚠️ Прервано пользователем{Style.RESET_ALL}") sys.exit(130) except Exception as e: logger.exception(f"{Fore.RED}❌ Критическая ошибка: {type(e).__name__}: {e}{Style.RESET_ALL}") sys.exit(1) # ========================== ЗАПУСК ПРОГРАММЫ ========================== # Этот блок запускает main() только если файл запущен напрямую # (не при импорте как модуль) if __name__ == '__main__': main() Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение
Qreeq #1951 46 минут назад (изменено) Нахуй ты коды свои бездарные кидаешь, с точки зрения именно кода пизда полная там у тебя написана. Нужны результаты, обзоры на юзеров. Закинь лучше в своего мутанта @Linde вот эту подругу. А топик надо поискать про диссер вот он Изменено 38 минут назад пользователем Qreeq Цитата Поделиться сообщением Ссылка на сообщение
yellyex #1952 30 минут назад Qreeq написал 15 минут назад: Нахуй ты коды свои бездарные кидаешь, с точки зрения именно кода пизда полная там у тебя написана. Нужны результаты, обзоры на юзеров. Закинь лучше в своего мутанта @Linde вот эту подругу. А топик надо поискать про диссер вот он Ок. Qreeq понравилось это Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение
Herokills #1953 17 минут назад yellyex написал 3 часа назад: Качаешь python последней версии. Устанавливаешь все библиотеки. Если не знаешь какие, то скопируй код в нейронку бесплатную квег или дип сик, и узнай как установить. Затем пользуйся сам на здоровье. И последний код с ошибками. Сейчас веду исправление. И уточни, что за Соня и в какой теме. Пожалуйста а че ты айти макакой не работаешь? уже давно бы закрыл все потребности с финансами на землянку Цитата Показать содержимое Поделиться сообщением Ссылка на сообщение
yellyex #1954 14 минут назад Herokills написал 1 минуту назад: yellyex написал 3 часа назад: Качаешь python последней версии. Устанавливаешь все библиотеки. Если не знаешь какие, то скопируй код в нейронку бесплатную квег или дип сик, и узнай как установить. Затем пользуйся сам на здоровье. И последний код с ошибками. Сейчас веду исправление. И уточни, что за Соня и в какой теме. Пожалуйста а че ты айти макакой не работаешь? уже давно бы закрыл все потребности с финансами на землянку Я не умею. Тем более работать. Я в программировании только отдыхаю. Да не пошло чё то. Qreeq Цитата moonfangtopich написал 29.08.2019 в 14:57: У вас недостаточно широкий кругозор, пацаны Я странствия этого еблана видел в покерных топанах, а потом в таверне - это один из самых безумных людей на форуме. Я искренне надеялся, что его зов о помощи останется незамеченным, но нет, нашелся доброволец и вот уже три страницы мы пожинаем плоды Поделиться сообщением Ссылка на сообщение