help-circle
rss

[SOLVED] How to get conflicting files with same relative paths from multiple folders?
I want to make a virtual file system from a few folders and want to check if there are any conflicting files. So I want to provide a few folders and get files with the same path relative to their folders. How can I find the conflicts? This is what I've done so far. The `get_files` and `remove_duplicates` functions aren't working as I expected. ```python import os import shutil import sys from collections import Counter from pathlib import Path from typing import List def main(): folders = sys.argv[1:] if len(folders) < 2: print("Please provide at least 2 folders") exit(1) files = get_files(folders) conflicting_files = find_conflicting_files(files) conflicting_files = remove_duplicates(conflicting_files) print_conflicting_files(conflicting_files) def get_files(folders): files = [] for folder in folders: files.extend([os.path.relpath(path, folder) for path in Path(folder).rglob("*")]) return files def test_get_files(): try: os.makedirs("test/folder1/a", exist_ok=True) os.makedirs("test/folder2/b", exist_ok=True) open("test/folder1/a/file", "w").close() open("test/folder2/b/file", "w").close() folders = ["test/folder1", "test/folder2"] assert get_files(folders) == ["a/file", "b/file"] finally: shutil.rmtree("test") def find_conflicting_files(files) -> List: return [file for file, cnt in Counter(files).items() if cnt > 1] def test_find_conflicting_files(): files = [ ["a", "b", "c"], ["a", "b", "d"], ["a", "b", "e"], ["a", "b", "f"], ] assert find_conflicting_files(files) == ["a", "a", "a", "b", "b", "b"] def remove_duplicates(l: List) -> List: return [*set(l)] def test_remove_duplicates(): files = ["a", "a", "b", "b", "c", "c"] assert remove_duplicates(files) == ["a", "b", "c"] def print_conflicting_files(files): for file in files: print(file) if __name__ == "__main__": main() ```

Find leaf folders that aren't hidden folders
I have a folder structure with some epubs and json files in the down-most folders (not counting the `.ts` folders). I'm exporting tags from the json files to tagspaces, by creating a `.ts` folder with some other `json` files. I've already processed part of the files and now I want to find the leaf folders that don't have a `.ts` folder in their path, to find the remaining files without having to process the others twice. I want to process the files in the directories as I find them instead of getting a list of directories and then looping through them. On the example below I've returned the list of directories only to be able to test it. So for this example I only want to do something for the folder `t5`: test ├── t1 │   ├── t2 │   │   └── t5 │   └── t3 │   └── .ts └── .ts └── t4 This is what I've tried: import os import shutil from typing import List def process_files_in_leaf_subdirectories(dir: str) -> List[str]: dirs = [] for root, subdirs, filenames in os.walk(dir): if subdirs or '.ts' in root: continue dirs.append(root) return dirs def test_process_files_in_leaf_subdirectories(): os.makedirs('tmp/t1/t2/t5', exist_ok=True) os.makedirs('tmp/t1/t3/.ts', exist_ok=True) os.makedirs('tmp/.ts/t4', exist_ok=True) assert get_files_in_leaf_subdirectories('tmp') == ['tmp/t1/t2/t5'] shutil.rmtree('tmp') The next example works fine but it gets the list of directories instead of processing the files as they are found: import os import shutil from pathlib import Path from typing import List def process_files_in_leaf_dir(leaves: List[Path]) -> List[str]: files = [] for dir in leaves: for meta_file in dir.glob("*.json"): files.append(meta_file) return files def find_leaf_dirs(root_path: Path) -> Path: # filter subdirectories child_dirs = [path for path in root_path.iterdir() if path.is_dir()] # if no child_dir, yield & return if not child_dirs: yield root_path return # otherwise iter tru subdir for path in child_dirs: # ignore hidden dir if path.stem[0] == ".": continue # step in and recursive yield yield from find_leaf_dirs(path) def test_process_files_in_leaf_dir(): os.makedirs('tmp/t1/t2/t5', exist_ok=True) os.makedirs('tmp/t1/t3/.ts', exist_ok=True) os.makedirs('tmp/.ts/t4', exist_ok=True) Path('tmp/t1/t2/t5/test.json').touch() Path('tmp/t1/t3/test.json').touch() Path('tmp/t1/t3/.ts/test.json').touch() Path('tmp/.ts/t4/test.json').touch() leaves = list(find_leaf_dirs(Path('tmp'))) assert process_files_in_leaf_dir(leaves) == [Path('tmp/t1/t2/t5') / 'test.json'] shutil.rmtree('tmp') [context](https://git.disroot.org/hirrolot19/AO3Scraper/src/commit/bfe85bef77bffa0c84dd8481af50c2ac9097fc62/export_tags.py#L57-L80)

Scraping website asynchronously with aiohttp_scraper library using pool of proxies
I'm on linux and I've installed and executed `redis` on the background, which is required by [aiohttp_scraper][1]. The library didn't say I had to install `redis` myself so maybe it's missing some others step I have to take before I can use it. The following code works fine: import asyncio from aiohttp_scraper import Proxies from aiohttp_scraper import ScraperSession as ClientSession from urllib.request import urlopen def scrape(): TEST_URL = "https://books.toscrape.com/catalogue/" urls = [f"{TEST_URL}page-{str(i)}.html" for i in range(1, 5)] scraper = WebScraper(urls) asyncio.run(scraper.run()) print(scraper.master_dict) def get_proxies() -> Proxies: PROXY_URL = "https://raw.githubusercontent.com/TheSpeedX/SOCKS-List/master/http.txt" proxies = urlopen(PROXY_URL).read().decode('utf-8').splitlines() return Proxies( proxies=proxies, redis_uri="redis://localhost:6379", window_size_in_minutes=5, max_requests_per_window=300, ) class WebScraper(object): def __init__(self, urls): self.urls = urls self.proxies = get_proxies() self.master_dict = {} async def run(self): loop = asyncio.get_event_loop() async with ClientSession(loop=loop) as session: tasks = [loop.create_task(self.fetch(session, url)) for url in self.urls] await asyncio.gather(*tasks) async def fetch(self, session, url): async with session.get(url) as response: print(response.status) self.master_dict[url] = await response.text() if __name__ == "__main__": scrape() But if I change line 34 to async with ClientSession(loop=loop, proxies=self.proxies) as session: then the code hangs every time I execute it. The only thing I see in the output is: ❯ python test.py Task was destroyed but it is pending! task: <Task pending name='Task-6' coro=<RedisConnection._read_data() running at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:168]> Task was destroyed but it is pending! task: <Task pending name='Task-7' coro=<RedisConnection._read_data() running at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:168]> Task was destroyed but it is pending! task: <Task pending name='Task-8' coro=<RedisConnection._read_data() running at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:186> wait_for=<Future pending cb=[Task.task_wakeup()]> cb=[RedisConnection.__init__.<locals>.<lambda>() at /home/user/.local/lib/python3.10/site-packages/aioredis/connection.py:168]> [1]: https://github.com/jgontrum/aiohttp-scraper

Scrape a website asynchronously using a list of tor circuits
I want to scrape a website asynchronously using a list of tor circuits with different exit nodes and making sure each exit node only makes a request every 5 seconds. For testing purposes, I'm using the website https://books.toscrape.com/ and I'm lowering the sleep time, number of circuits and number of pages to scrape. It works fine without tor, but I'm getting the following error when I use tor.: 2022-09-06 11:08:49,380 [DEBUG] Loaded 10 authorities dir 2022-09-06 11:08:49,383 [DEBUG] Loaded 141 fallbacks dir 2022-09-06 11:08:49,383 [DEBUG] Using selector: EpollSelector 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' 2022-09-06 11:08:49,384 [ERROR] '_GeneratorContextManager' object has no attribute 'create_stream' {} import asyncio import aiohttp import logging from docopt import docopt from torpy import TorClient from typing import Dict, List logging.basicConfig( level=logging.DEBUG, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler("debug.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def main(): """ Usage: scraper.py <url>... [--tor] scraper.py -h | --help Options: -h --help Show this screen. --tor Use tor to scrape website """ args = docopt(main.__doc__) urls = args['<url>'] tor = args['--tor'] scrape_website(urls, tor) def scrape_test_website() -> None: TEST_URL = "https://books.toscrape.com/catalogue/" urls = [f"{TEST_URL}page-{str(i)}.html" for i in range(1, 5)] print(scrape_website(urls, tor=True)) def scrape_website(urls: List[str], tor: bool = False) -> Dict: if tor: scraper = TorWebScraper(urls) else: scraper = WebScraper(urls) asyncio.run(scraper.run()) return scraper.master_dict class WebScraper(object): def __init__(self, urls: List[str]): self.urls = urls self.all_data = [] self.master_dict = {} async def fetch(self, url: str) -> str: try: async with aiohttp.ClientSession() as session: async with session.get(url) as response: text = await response.text() return url, text except Exception as e: logger.error(e) async def run(self) -> None: tasks = [] for url in self.urls: tasks.append(self.fetch(url)) self.all_data = await asyncio.gather(*tasks) for data in self.all_data: if data is not None: url = data[0] self.master_dict[url] = {'raw_html': data[1]} def get_circuits(n: int = 2) -> List: """ Get a list of one-hop tor circuits with different nodes """ circuits = [] with TorClient() as tor: for _ in range(n): circuits.append(tor.create_circuit()) return circuits class TorWebScraper(WebScraper): def __init__(self, urls: List[str]): super().__init__(urls) self.circuits = get_circuits(2) async def fetch(self, url: str) -> str: try: async with aiohttp.ClientSession() as session: for circuit in self.circuits: async with circuit.create_stream() as stream: async with session.get(url, proxy=stream.proxy) as response: await asyncio.sleep(20e-3) text = await response.text() return url, text except Exception as e: logger.error(e) if __name__ == '__main__': #main() scrape_test_website()

Import tags to wutag. CBOR serialization of a dictionary?
> [Wutag][1] tags are currently stored in a way that makes them unique to wutag. > They might get picked up by other programs, but they won't look the > same. The reason why is that tags get serialized in cbor and prefixed > with wutag. for easier future extraction as can be seen [here][2]: fn hash(&self) -> Result<String> { serde_cbor::to_vec(&self) .map(|tag| format!("{}.{}", WUTAG_NAMESPACE, base64::encode(tag))) .map_err(Error::from) How can I serialize the tags in python? import base64 import pytest import xattr WUTAG_NAMESPACE = "user.wutag" def hash(tag: Dict) -> str: return f"{WUTAG_NAMESPACE}.{pickle.dumps(tag).encode('base64', 'strict')}" # AttributeError: 'bytes' object has no attribute 'encode' #return f"{WUTAG_NAMESPACE}.{base64.b64encode(json.dumps(tag))}" # TypeError: a bytes-like object is required, not 'str' def test_hash(): tag = {'name': 'tag1', 'color': '#000000'} assert hash('tag') == 'user.wutag.dGFn' def hash_tags(tags: List[Dict]) -> List[str]: return [hash(tag) for tag in tags] def import_tags_wutag(path: str, tags: List[str]) -> List[str]: xattr.setxattr(path, WUTAG_NAMESPACE, hash_tags(tags)) Here is another try that doesn't seem to work either: def write_wutag_tags(parent_folder: str, tags: List[str]) -> None: for format in FORMAT_LIST: filepath = get_filepath(parent_folder, format) subprocess.run(['wutag', 'set', filepath, *tags]) This last one fails with this error if any of the tags is empty. error: The argument '<tags>...' requires a value but none was supplied USAGE: wutag set <pattern> <tags>... For more information try --help Otherwise it works fine. [1]: https://github.com/vv9k/wutag [2]: https://github.com/vv9k/wutag/blob/6d697957cc87ad75c380b6cd7f7ecaee2ef83182/wutag_core/tag.rs#L99-L102

Need help parsing arguments for a script
I want to be able to call [this script](https://git.disroot.org/hirrolot19/AO3Scraper/src/commit/9315d6d339e5b760251fa27bd1e8016019f56793/extras/import_ao3_tags.py) with a single id, a list of ids, a url, or a csv file. Is there a simple way to express it? Here I've only added options to call the script with nothing, a single id or a list of ids. But it already looks too messy for my taste. def parse_args(): try: if len(sys.argv) == 3: # List if sys.argv[2].startswith("[") and sys.argv[2].endswith("]"): work_ids = sys.argv[2].strip("[").strip("]").split(",") download_ao3_fics_and_import_tags(work_ids) else: # single work_id download_ao3_fic_and_import_tags(sys.argv[2]) elif len(sys.argv) != 3: main() else: usage() except Exception: usage()

Looking for a guide to scrape concurrently with tor
Like this guide but with concurrency: [Scraping Websites with Python, Selenium, and Tor](https://scribe.rip/m/global-identity?redirectUrl=https%3A%2F%2Fpython.plainenglish.io%2Fthe-big-data-heist-a6b073b30de5) It doesn't need to have anything about Selenium.

How to wait until connected to the tor network with selenium?
Following the guide [Scraping Websites with Python, Selenium, and Tor][1] I don't know how could I wait only until the tor browser is connected instead of waiting a fixed amount of time. Something like what there is in the comment below, which isn't working right now. The `torStatus` is just an example with doesn't really exist. I'm looking for an actual way to tell when it is connected. def connect_tor() -> None: driver.find_element(By.XPATH, '//*[@id="connectButton"]').click() #while driver.find_element(By.XPATH, '//*[@id="torStatus"]').get_attribute("value") != "connected": #time.sleep(1) time.sleep(20) [1]: https://scribe.rip/m/global-identity?redirectUrl=https%3A%2F%2Fpython.plainenglish.io%2Fthe-big-data-heist-a6b073b30de5

looking for a fast multi process shared dict
I have lots of multiprocessing processes which have to add to and search in a dict. Deletion of values is not needed. Atm I am using multiprocessing.Manager() and `dict = manager.dict()` This works pretty well, but I think that the manager is a huge bottleneck here. Any ideas? It has to run on older Python 3 versions, otherwise I would use this cool thing I found: https://github.com/ronny-rentner/UltraDict
fedilink

[Solved] Show progress bar of a ffmpeg video convertion
I'm trying to print a progress bar while executing ffmpeg but I'm having trouble getting the total number of frames and the current frame being processed. The error I get is `AttributeError: 'NoneType' object has no attribute 'groups'`. I've copied the function from [this program][1] and for some reason it works there but not here, even though I haven't changed that part. [main.py][2] ``` pattern_duration = re.compile( 'duration[ \t\r]?:[ \t\r]?(.+?),[ \t\r]?start', re.IGNORECASE) pattern_progress = re.compile('time=(.+?)[ \t\r]?bitrate', re.IGNORECASE) def execute_ffmpeg(self, manager, cmd): proc = expect.spawn(cmd, encoding='utf-8') self.update_progress_bar(proc, manager) self.raise_ffmpeg_error(proc) def update_progress_bar(self, proc, manager): total = self.get_total_frames(proc) cont = 0 pbar = self.initialize_progress_bar(manager) try: proc.expect(pattern_duration) while True: progress = self.get_current_frame(proc) percent = progress / total * 100 pbar.update(percent - cont) cont = percent except expect.EOF: pass finally: if pbar is not None: pbar.close() def raise_ffmpeg_error(self, proc): proc.expect(expect.EOF) res = proc.before res += proc.read() exitstatus = proc.wait() if exitstatus: raise ffmpeg.Error('ffmpeg', '', res) def initialize_progress_bar(self, manager): pbar = None pbar = manager.counter( total=100, desc=self.path.rsplit(os.path.sep, 1)[-1], unit='%', bar_format=BAR_FMT, counter_format=COUNTER_FMT, leave=False ) return pbar def get_total_frames(self, proc): return sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) def get_current_frame(self, proc): proc.expect(pattern_progress) return sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) ``` [1]: https://github.com/JavierOramas/video-diet/blob/966b5192902e55bd60ae06a5df195ec41bcd5d71/diet_video/__init__.py [2]: https://codeberg.org/LifeSymbiont/reduce_video_size/src/commit/ef3cd0974ecd1c4d0a17b6394499650c9fc3da2b/main.py

[Solved] Need to understand and simplify this function that converts a video with ffmpeg while showing a progress bar
This function is giving me an error but I've copied it from [the video-diet program](https://github.com/JavierOramas/video-diet/blob/81a5df4ad27e8cd6fff1be4974067631343a4354/diet_video/__init__.py#L42) and I don't really understand it so I can't simplify it. Could someone who understands it, explain it step by step? ```py def convert_video_progress_bar(self, manager, cmd): name = self.path.rsplit(os.path.sep, 1)[-1] proc = expect.spawn(cmd, encoding='utf-8') pbar = None try: proc.expect(pattern_duration) total = sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) cont = 0 pbar = manager.counter( total=100, desc=name, unit='%', bar_format=BAR_FMT, counter_format=COUNTER_FMT, leave=False ) while True: proc.expect(pattern_progress) progress = sum(map(lambda x: float( x[1])*60**x[0], enumerate(reversed(proc.match.groups()[0].strip().split(':'))))) percent = progress/total*100 pbar.update(percent-cont) cont = percent except expect.EOF: traceback.print_exc() finally: if pbar is not None: pbar.close() proc.expect(expect.EOF) res = proc.before res += proc.read() exitstatus = proc.wait() if exitstatus: raise ffmpeg.Error('ffmpeg', '', res) ```

[SOLVED] Get list of strings from a messed multiline string
I want to do the following but I don't know how to get the name and percentage: `mymodule_test.py` ``` import unittest from unittest import TestCase from mymodule import get_wanted_cards, get_percentage class GetCardsTestCase(TestCase): def test_get_wanted_cards(self): s = ''' ↑ A Little Chat 92 (1%) 0 (0%) NEW ↑ An Offer You Can't Refuse 88 (87%) 0 (0%) NEW ↑ Angelic Observer 92 (91%) 0 (0%) NEW ''' expected = ["4 An Offer You Can't Refuse", "4 Angelic Observer"] actual = get_wanted_cards(s) self.assertEqual(actual, expected) def test_get_percentage(self): s = "92 (1%)" expected = 1 actual = get_percentage(s) self.assertEqual(actual, expected) if __name__ == '__main__': unittest.main() ``` `mymodule.py` ``` import re from typing import List def get_wanted_cards(s: str) -> List[str]: res = [] for line in s.splitlines(): array = line.split("\t") if len(array) < 5: continue name = array[1] percent = get_percentage(array[2]) if percent >= 50: res += "4 " + name return res def get_percentage(s: str) -> int: return int(re.match(r'\(([0-9]*)%\)', s).group(1)) if __name__ == "__main__": pass ```
fedilink

Python Programming Introduction [MOOC]
cross-posted from: https://lemmy.ml/post/138510 > By University of Helsinki
fedilink

Ask specific questions about how to code something in python

Python docs (tutorial)
Intro to programming - University of Helsinki

General python discussion on lemmy.ml

Create Post From:
lemmy.ml

  • 0 users online
  • 1 user / day
  • 1 user / week
  • 1 user / month
  • 8 users / 6 months
  • 25 subscribers
  • 14 Posts
  • 22 Comments
  • Modlog
Heap Overflow
A place to ask programming questions and share free resources

General programming discussion,
Additional resources, challenges

To post or comment:

  1. Create an account on another lemmy instance
  2. Then search for the community url like here

RULES:

  1. No politics
  2. No flaming / trolling
  3. No proprietary BS
  4. Stay on topic

Please keep questions & examples short!

All content is Public Domain unless otherwise specified.
Only CC0, CC BY 3.0, or CC BY-SA 3.0 alternate licenses are allowed.

No affiliation with StackOverflow.com