How to automate advanced email scraping YouTube videos in python.

YouTube is a place full of content and media. That makes it a great place to collect, and scrape data. In this tutorial, I’ll be explaining how to scrape videos on YouTube with extra methods and features to make this task easy and fun.

Python is a great place to start. Its syntax is beginner friendly and it’s accessibility to libraries makes it easy to scrape with ease. We’ll be using a handful of libraries to help us out. In our case, some major imports we’ll use are: Selenium, multiprocessing, requests, and queue.

Lets start with setting up our imports…

# ----- Imports and Library's ----- # 
import requests
import re
import time
import queue
import multiprocessing
from threading import Thread
import sys
# ----- Selenium Library's ----- #
import selenium
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import chromedriver_autoinstaller

Now that we setup all our imports we can start with creating our classes and functions to do the job. We’ll start with creating the class “Scrape”.

So first we initialize our class and create our instance variables. We have our search query which is the videos we search for, we have link count which is the minimum amount of links we want (unless times out), we have our view and sub count variables which are set to get the maximum amount of subs or views allowed per video, and finally we have self.url which creates the URL to open and adds '&sp=CAO%253D' which gives us the most recently uploaded videos.

# Declarations
self.filtered_links = queue.Queue()
self.links = set()
self.timeout = False
self.links_gathered = False
self.position = 0
self.manager = multiprocessing.Manager()
self.return_dict = self.manager.dict()
self.channels = [] # Used to store viewed channels and their subs

Now add this snippet of code right after the code inside the __init__ . Here we declare some variables such as: self.filtered_links = queue.Queue() which creates us our queue that has no duplicate urls, self.links = set() ensures we don’t save any duplicates, self.timeout is for a timeout function we create incase gathering links takes too long, self.position is for our starting position to render the YouTube’s page. We need to use JavaScript to keep rendering the page to gather more links. self.manager and self.return_dict is used to store our emails. And finally we have self.channels which is used to keep track of channels seen and their subscriber count (We do so to make the code faster and more efficient).

# Driver Setup and Options   chromedriver_autoinstaller.install(cwd=True) # Get latest version of chromedriver // Cwd=True -> Saves driver in directory   
self.options = webdriver.ChromeOptions() self.options.add_argument("--incognito") self.options.add_argument("--log-level=OFF") self.options.add_argument('--disable-gpu') self.options.add_argument("--headless") # Sets headless mode for no window popup
self.__FetchVideos() # Fetch URLs

Now we setup Selenium’s drivers and options and call our FetchVideosfunction. We use the library chromedriver_autoinstaller so that we always get the correct updated version of chromedriver automatically. We set some arguments like --incognito so it runs chrome in incognito mode. We also use --headless so chrome doesn’t pop up and runs seamlessly in the background.

Now lets create our FetchVideos function, which gathers the URLs matching our parameters. We start by creating the function __FetchVideos(self): and declare some variables. Then we start up and open chrome driver. We use with which is a context manager. It ensures the code can open and close safely. We then call driver.get(self.url) which opens chrome and gets the URL to the videos page. We then create a thread for our timer function and start it.

Add this right under the code before. Here we render the URLs to gather. We use a try and except to catch any errors (typically YouTube popping up captcha). If we catch an error, we will close driver just in case and exit the code. Starting off we have a while loop. It loops until the number of links is gathered or timeout is set to True. Then we run a for loop using WebDriverWait until all elements are visible. We use XPATH selector to get the URLs of the videos. Then if we want sub count or view count parameters to be checked, we go into the if statement find the views element. In subscribers case, we have to get the channel and use our function that we make __subFinder(channel) to get the sub count. If the channel is already stored in our list, return the sub count. Then we run it through our if statement to check it’s the right requirements then if so, add URL to self.links . Finally we add 10000 to self.positionand execute some JavaScript to render more videos. We add all our non-duplicate links to the queue and set self.links_gathered = True.

Now lets create our __timeoutFuncfunction and __subFinderfunction. Lets start with our __subFinderfunction.

Here we create our function and take in a URL parameter. We use requests to get the page source then using regex, we try to find the subscriber count. YouTube makes it hard and messy by using their own custom tags interpreted by their JavaScript code so we had to improvise. If we get an error it most likely means they have their subscriber count set to hidden. Then we add the channel and subs to our list and return the sub count.

Here we declare our count is 60 seconds and say if sub count is enabled, add another 60 seconds because it takes longer to fetch sub count. We set it for 50 links a minute and if sub count, 50 links per 2 minutes. So if we have 100 links to fetch and no sub count required, it’ll take 1 minute max before returning the links gathered so far.

Now earlier you saw this function being called, cstn. That’s another function we created to convert string numbers like “5k” or “50,000" to be an integer. Here what the function looks like.

Now lets get onto the email scraping part. We’ll be using multi-processing to make it faster and more efficient, taking advantage of our computers processing power.

Here we create our worker function. This function gets the URLs page from the queue, uses regex to look for an email and calls our function fix_email to fix any broken emails like techysavage@gmail... would be techysavage@gmail.com. Then we save it to our manager dictionary and release the lock. Below is our fix email function which tries to just fix any minor mistakes.

# ----- Fix Email Function ----- #
def fix_email(email):

Finally we have our run function which handles it all. First we check if inputted CPU count is a digit, if not then we check if the parameter says half or full. Half means it uses half of the logical processors or uses CPU usage at around 50% whereas full uses them all. We create our semaphore lock so that it can only uses a specific amount of logical processors. Then we create our scrape object and run it. Once it’s finished we call our start_processes function and run the workers. When it’s all finished, we get our emails saved without any duplicates in a set, and create a returnable object which has the total time and emails stored in a dictionary.

Thank you for reading through and I hope you’ve learned something new and this code helps. You can check the code out on my GitHub or copy the full code below. 👍 https://github.com/Ashura-R/Youtube-Email-Scraper/

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store