l3khub.com

블로그스팟 원클릭 검색엔진 색인요청 #12 - 수정된 전체 소스 코드

이번편에는 빙웹마스터, 구글서치콘솔, 네이버 서치어드바이저에 색인요청하는 소스코드를 일부 수정하여 사용하기 편리하게 만든 최종 버전을 공유합니다. 사용자의 블로그스팟 블로그ID와 사용자정보만 변경하면 바로 사용할 수 있도록 수정하였습니다. 

각-검색엔진에-색인요청하는-전체소스코드

블로그스팟 원클릭 검색엔진 색인요청 #12 - 수정된 전체 소스 코드



전체 소스 코드

구글서치콘솔, 빙웹마스터, 네이버 서치어드바이저 각각에 색인요청을 하는 과정을 알아보았습니다. 여기서는 각각의 소스를 일부 수정하고, 사용자가 쉽게 본인의 환경에 맞게 변경할 수 있도록 소스코드를 수정하였습니다. 

import sys
import requests
from PyQt5.QtWidgets import *
from PyQt5.QtGui import *
from PyQt5 import uic
from datetime import datetime
from PyQt5.QtCore import QDate, Qt

from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
import io
import os
import pickle
import time

from oauth2client.service_account import ServiceAccountCredentials
from googleapiclient.discovery import build 
from googleapiclient.http import BatchHttpRequest
import httplib2
import json

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver import ActionChains
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import pyperclip

from tkinter.messagebox import askyesno

#UI파일 연결
#단, UI파일은 Python 코드 파일과 같은 디렉토리에 위치해야한다.
form_class = uic.loadUiType("auto-submit.ui")[0]

#변수선언

#이미 수집요청을 한 URL를 저장하는 파일
submitted_urls ="D:/Workplace/submitted_url.txt"
#Blogspot 블로그에 새로운 글이 있는지 확인하기 위해 사용하는 JSON 파일
blogspot_check_json = 'your_blogger.apps.googleusercontent.com.json'
#Bing API Key code
bing_api_key = 'your_bing_API_key'
#Google Search Console API JSON 파일
gsc_key_json = "your_google_search_console_api.json"
#naver ID, Password
naver_id = "your_naver_id"
naver_pw = "your_naver_pw"
#도메인 url
domain_url = 'https://your_blogspot.com'
#blog addr, id,
blog = [['your_blogspot.com',   'your_blogspot_id']]

# If modifying these SCOPES, delete the file token.pickle.
SCOPES = ['https://www.googleapis.com/auth/blogger']

def logwrite(data) :
    output = io.StringIO()
    sys.stdout = output
    print(data)
    sys.stdout = sys.__stdout__
    output_str = output.getvalue()
    return output_str

#화면을 띄우는데 사용되는 Class 선언
class WindowClass(QMainWindow, form_class) :
    def __init__(self) :
        super().__init__()
        self.setupUi(self)

        self.setFixedSize(1500, 860)

        self.btn1.clicked.connect(self.pbtn1)
        self.btn2.clicked.connect(self.pbtn2)

        self.chk1.stateChanged.connect(self.chkfunc1)
        self.chk2.stateChanged.connect(self.chkfunc2)
        self.chk3.stateChanged.connect(self.chkfunc3)

        global lines

        #submited_urls.txt 읽기
        f = open(submitted_urls, 'r')
        lines = f.readlines()
        for line in lines:
            self.text2.append(line.replace('\n', ''))
        f.close()

    def pbtn1(self) :
        #now = QDate.currentDate()
        #self.text2.append(now.toString())

        # Blogspot에 오늘 날짜로 새로 포스팅 된 글 링크 가져오기
        def service_setup():
            creds = None
            if os.path.exists('token.pickle'):
                with open('token.pickle', 'rb') as token:
                    creds = pickle.load(token)
            # If there are no (valid) credentials available, let the user log in.
            if not creds or not creds.valid:
                if creds and creds.expired and creds.refresh_token:
                    creds.refresh(Request())
                else:
                    flow = InstalledAppFlow.from_client_secrets_file(blogspot_check_json, SCOPES)
                    creds = flow.run_local_server(port=0)
                # Save the credentials for the next run
                with open('token.pickle', 'wb') as token:
                    pickle.dump(creds, token)

            try:
                return build('blogger', 'v3', credentials=creds)
            except Exception as e:
                #print(e)
                self.text3.append(logwrite(e))
                return None

        def get_latest_post_url(service, blog_id):
            try:
                posts = service.posts()
                request = posts.list(blogId=blog_id)
                posts_doc = request.execute()
                # Assuming the latest post is the first one
                latest_post = posts_doc['items'][0]
                #print('The URL of the latest post is: {}'.format(latest_post['url']))
                self.text1.append(latest_post['url'])
            except Exception as e:
                #print(e)
                self.text3.append(logwrite(e))

        def get_today_posts(service, blog_id, blog):
            try:
                posts = service.posts()
                request = posts.list(blogId=blog_id)
                posts_doc = request.execute()

                # Get today's date
                today = datetime.now().strftime("%Y-%m-%d")

                cnt = 0
                for item in posts_doc['items']:
                    # Get the published date of the post
                    published_date = item['published'][:10] # Blogger API returns the published date in the format "YYYY-MM-DDTHH:MM:SS.SSZ"

                    if published_date == today:
                        #print('The URL of the today\'s post is: {}'.format(item['url']))
                        s = item['url']
                        ss = s.replace('http:', 'https:')
                        self.text1.append(ss)
                        cnt = cnt + 1
                if cnt == 0: self.text3.append(blog + " 에 오늘 작성한 포스팅이 없습니다.")
            except Exception as e:
                #print(e)
                self.text3.append(logwrite(e))

        service = service_setup()
        if service:            
            for chk_blg in blog:
                #각블로그에서 오늘 날짜로 포스팅된 모든 글 가져오기
                get_today_posts(service, chk_blg[1],chk_blg[0])

    def pbtn2(self) :
        new_urls = self.text1.toPlainText().strip().split("\n")
        chk_urls = self.text2.toPlainText().strip().split("\n")

        # 리스트 a를 순회하며 중복된 값을 찾음
        for value in new_urls[:]: # a[:]를 사용하여 리스트 a의 사본을 만듬
            if value in chk_urls:
                # 중복된 값에 대해 사용자에게 삭제할지 확인
                answer = askyesno(title="중복 URL 체크", message= value + " 은 이미 등록요청한 이력이 있습니다.\n제외할까요?")

                # 사용자가 'yes'를 입력하면 해당 값을 삭제
                if answer == True:
                    new_urls.remove(value)

        if len(new_urls) != 0 :
            #Bing Webmaster 이 체크되어 있을 경우
            self.text3.append("Bing Webmaster에 URLs를 제출합니다.")
            if self.chk1.isChecked() : 
                url = "https://ssl.bing.com/webmaster/api.svc/json/SubmitUrlbatch?apikey=" + bing_api_key

                data = {
                "siteUrl": domain_url,
                "urlList": new_urls
                }
                response = requests.post(url, json=data)
                self.text3.append(logwrite(response.json()))
                self.text3.append('Bing Webmaster에 수집요청을 하였습니다\n\n')

            #Google Search Console 이 체크되어 있을 경우
            if self.chk2.isChecked() : 
                #Google Search Console 등록
                self.text3.append("Google Search Console에 URLs를 제출합니다.")
                
                SCOPES = [ "https://www.googleapis.com/auth/indexing" ]
                ENDPOINT = "https://indexing.googleapis.com/v3/urlNotifications:publish"
                
                # Authorize credentials
                credentials = ServiceAccountCredentials.from_json_keyfile_name(gsc_key_json, scopes=SCOPES)
                http = credentials.authorize(httplib2.Http())

                # Build service
                service = build('indexing', 'v3', credentials=credentials)
                
                def insert_event(request_id, response, exception):
                    if exception is not None:
                        #print(exception)
                        self.text3.append(logwrite(exception))

                    else:
                        #print(response)
                        self.text3.append(logwrite(response))

                batch = service.new_batch_http_request(callback=insert_event)
                
                for url in new_urls:
                    batch.add(service.urlNotifications().publish(
                        body={"url": url, "type": 'URL_UPDATED'}))
                
                batch.execute()
                self.text3.append('Google Search Console에 수집요청을 하였습니다\n\n')

            #Naver Search Advisor 이 체크되어 있을 경우
            if self.chk3.isChecked() : 
                driver = webdriver.Chrome()
                
                driver.implicitly_wait(10)
                wait = WebDriverWait(driver, 10)
                act = ActionChains(driver)

                driver.get('https://searchadvisor.naver.com/console/board')

                # id, pw 입력할 곳을 찾습니다.
                tag_id = driver.find_element(By.ID, 'id')
                tag_pw = driver.find_element(By.ID, 'pw')

                # ID 입력
                tag_id.clear()
                tag_id.click()
                pyperclip.copy(naver_id)
                tag_id.send_keys(Keys.CONTROL, 'v')

                # PW 입력
                tag_pw.click()
                pyperclip.copy(naver_pw)
                tag_pw.send_keys(Keys.CONTROL, 'v')

                # 로그인 버튼 클릭
                login_btn = driver.find_element(By.ID, 'log.login')
                login_btn.click()

                nav_urls = list()
                nav_urls = new_urls

                nav_urls.sort()

                old_blog = ''
                for url in nav_urls:
                    cur_blog = url.split('/')[2]
                    #URL을 제출해야 할 블로그가 여러개일 경우 각각의 블로그로 이동해서 등록할 수 있도록 하기 위한 코드
                    if old_blog != cur_blog:
                        old_blog = cur_blog
                        driver.get('https://searchadvisor.naver.com/console/site/request/crawl?site=https%3A%2F%2F' + cur_blog) 

                    tag_input = driver.find_element(By.XPATH, '/html/body/div/div/div/div/main/div/div[2]/div[2]/div/div[2]/div[2]/div[1]/div[2]/div[2]/div[1]/div[2]/div/div/div[1]/div/div/div/div/input')
                    btn_click = driver.find_element(By.XPATH, '//*[@id="app"]/div/main/div/div[2]/div[2]/div/div[2]/div[2]/div[1]/div[2]/div[2]/div[1]/div[2]/div/div/div[2]/button')                    

                    uu = url.split('/')[-1].strip('\n')

                    #현재 페이지에 수집요청하려고 하는 URL이 이미 있다면 time.sleep()를 사용하고 없다면 wait.until() 사용
                    #네이버 서치어드바이져에서는 수집보류가 나는 경우가 많기 때문에 한페이지에 동일한 URL이 있을 수 있음
                    is_existed = False
                    page_source = driver.page_source
                    if uu in page_source:
                        is_existed = True

                    #tag_input.clear() 가 제대로 작동하지 않아 tag_input필드에서 먼저 Ctrl+a로 전체선택후 delete키로 삭제하고 새로운 url 입력
                    act.send_keys_to_element(tag_input, Keys.CONTROL + 'a').send_keys_to_element(tag_input, Keys.DELETE).\
                        send_keys_to_element(tag_input, url).click(btn_click).perform()

                    if is_existed:
                        time.sleep(1.5)
                    else:
                        wait.until(EC.presence_of_element_located((By.PARTIAL_LINK_TEXT, uu)))

                    self.text3.append('네이버에 ' + url + ' 을(를) 수집요청하였습니다')
                
                time.sleep(2)

            #서치 엔진에 URL를 등록한 후 History에 URLS를 추가
            now = time
            str = now.strftime('\n%Y-%m-%d %H:%M:%S\n')
            self.text2.append(str)
            def listToString(str_list):
                result = ""
                for s in str_list:
                    result += s + "\n"
                return result.strip()
            self.text2.append(listToString(new_urls))
            
            f = open(submitted_urls, 'a')
            f.write(str)
            for url in new_urls:
                f.write(url+'\n')
            f.close()

        else:
            self.text3.append("서치엔진에 등록할 URL를 입력하세요")            

    def chkfunc1(self) : 
        if self.chk1.isChecked() : self.text3.append ("Bing Webmaster URLs 제출 체크") 
        else : self.text3.append("Bing Webmaster URLs 제출 체크 해제")

    def chkfunc2(self) : 
        if self.chk2.isChecked() : self.text3.append ("Bing Webmaster URLs 제출 체크") 
        else : self.text3.append("Bing Webmaster URLs 제출 체크 해제")

    def chkfunc3(self) : 
        if self.chk3.isChecked() : self.text3.append ("Bing Webmaster URLs 제출 체크") 
        else : self.text3.append("Bing Webmaster URLs 제출 체크 해제")

if __name__ == "__main__" :
    app = QApplication(sys.argv) 
    myWindow = WindowClass() 
    myWindow.show()
    app.exec_()




To Top