-
[ OpenAI / WebsiteQnA tutorial ] 총정리Openai 2023. 2. 28. 20:44반응형
import requests import re import urllib.request from bs4 import BeautifulSoup from collections import deque from html.parser import HTMLParser from urllib.parse import urlparse import os # Regex pattern to match a URL HTTP_URL_PATTERN = r'^http[s]*://.+' # Define root domain to crawl domain = "openai.com" full_url = "https://openai.com/"
1. 데이터 수집 - beautifulsoup 라이브러리를 통한 크롤링
# Create a class to parse the HTML and get the hyperlinks class HyperlinkParser(HTMLParser): def __init__(self): super().__init__() # Create a list to store the hyperlinks self.hyperlinks = [] # Override the HTMLParser's handle_starttag method to get the hyperlinks def handle_starttag(self, tag, attrs): attrs = dict(attrs) # If the tag is an anchor tag and it has an href attribute, add the href attribute to the list of hyperlinks if tag == "a" and "href" in attrs: self.hyperlinks.append(attrs["href"])
HTMLparser 클래스를 상속받는 HyperlinkParser 클래스를 정의합니다. 생성자 오버라이딩을 통해 부모 클래스의 생성자를 호출한 뒤 hyperlinks 변수를 정의합니다. 또한 부모 클래스에 정의되어 있던 메서드 handle_Starttag()를 오버라이딩하여 태그가 anchor 태그이며 href 속성을 지닌 경우 클래스 변수 hyperlinks에 href를 추가하도록 합니다. (HTMLParser에서 feed 메서드는 받은 인자를 처리한 뒤 handle_starttag 등 메서드에 인자로 준 뒤 호출합니다.)
# Function to get the hyperlinks from a URL def get_hyperlinks(url): # Try to open the URL and read the HTML try: # Open the URL and read the HTML with urllib.request.urlopen(url) as response: # If the response is not HTML, return an empty list if not response.info().get('Content-Type').startswith("text/html"): return [] # Decode the HTML html = response.read().decode('utf-8') except Exception as e: print(e) return [] # Create the HTML Parser and then Parse the HTML to get hyperlinks parser = HyperlinkParser() parser.feed(html) return parser.hyperlinks
get_hyperlink 함수는 인자로 받은 URL에서 href를 추출한 뒤 배열에 담아 반환합니다. urllib.request 모듈의 urlopen은 인자로 받은 url의 HTTPResponse Object를 반환하며 해당 객체는 info, read, readline 등의 메서드를 포함합니다. (자세한 내용은 아래 링크에서 확인 가능합니다.) [ with urllib.request.urlopen(url) as response ]에서는 해당 url의 HTTPResponse Object를 response에 저장하고 .info()를 통해 html 문서 여부를 판단한 뒤 html 문서인 경우 utf-8 방식으로 디코딩한 뒤 그 결과를 html 변수에 저장해 앞서 정의한 HyperlinkParser 인스턴스의 feed()에 인자로 전달해 href를 추출합니다. 여기서 with를 사용한 이유는 보다 간결하고 안전하게 리소스를 사용하고 해제하기 위해서입니다.(with 절이 끝나면 open 한 리소스는 자동으로 해제됩니다.)
# Function to get the hyperlinks from a URL that are within the same domain def get_domain_hyperlinks(local_domain, url): clean_links = [] for link in set(get_hyperlinks(url)): clean_link = None # If the link is a URL, check if it is within the same domain if re.search(HTTP_URL_PATTERN, link): # Parse the URL and check if the domain is the same url_obj = urlparse(link) if url_obj.netloc == local_domain: clean_link = link # If the link is not a URL, check if it is a relative link else: if link.startswith("/"): link = link[1:] elif link.startswith("#") or link.startswith("mailto:"): continue clean_link = "https://" + local_domain + "/" + link if clean_link is not None: if clean_link.endswith("/"): clean_link = clean_link[:-1] clean_links.append(clean_link) # Return the list of hyperlinks that are within the same domain return list(set(clean_links))
get_domain_hyperlinks 함수에서는 앞서 정의한 get_hyperlinks 함수를 통해 얻은 href 들을 정리하는 함수입니다. 각각의 href가 absolute URL인지 확인 한 뒤 맞다면, urlparse 함수를 사용해 href의 domain을 추출한 뒤 local_domain과 비교하고 동일하다면 마지막 부분의 '/'를 제거하고 clean_links 배열에 추가합니다. 만약 absolute URL이 아닌 relative URL이라면, href가 '/'로 시작하는 경우 이를 제거한 뒤 마지막 부분의 '/'를 제거하고 clean_links 배열에 추가한다 그 외의 경우 continue를 통해 생략하고 다음 href에 대해 동일한 작업을 진행합니다.
def crawl(url): # Parse the URL and get the domain local_domain = urlparse(url).netloc # Create a queue to store the URLs to crawl queue = deque([url]) # Create a set to store the URLs that have already been seen (no duplicates) seen = set([url]) # Create a directory to store the text files if not os.path.exists("text/"): os.mkdir("text/") if not os.path.exists("text/"+local_domain+"/"): os.mkdir("text/" + local_domain + "/") # Create a directory to store the csv files if not os.path.exists("processed"): os.mkdir("processed") # While the queue is not empty, continue crawling while queue: # Get the next URL from the queue url = queue.pop() print(url) # for debugging and to see the progress # Save text from the url to a <url>.txt file with open('text/'+local_domain+'/'+url[8:].replace("/", "_") + ".txt", "w") as f: # Get the text from the URL using BeautifulSoup soup = BeautifulSoup(requests.get(url).text, "html.parser") # Get the text but remove the tags text = soup.get_text() # If the crawler gets to a page that requires JavaScript, it will stop the crawl if ("You need to enable JavaScript to run this app." in text): print("Unable to parse page " + url + " due to JavaScript being required") # Otherwise, write the text to the file in the text directory f.write(text) # Get the hyperlinks from the URL and add them to the queue for link in get_domain_hyperlinks(local_domain, url): if link not in seen: queue.append(link) seen.add(link) crawl(full_url)
crawl 함수에서는 앞서 정의한 함수와 클래스를 활용해 url에 대한 크롤링을 진행합니다. 접근할 각 url을 queue에 담은뒤 활용하며, set 자료구조를 활용해 방문 여부를 확인하기 위한 seen 변수를 선언합니다. 각 url을 방문하기 전에 크롤링한 text 파일을 저장할 디렉토리와 csv파일을 저아할 디렉토리를 생성합니다.
각 url에 대한 방문은 while 문과 queue를 통해 이루어집니다. 각 순회과정에서는 dequeue 연산(이 경우 .pop())을 통해 방문할 url을 url 변수에 저장하며 해당 url을 title로 가지는 파일을 생성하고 작성합니다. 파일에 쓰여지는 텍스트는 BeautifulSoup() 객체를 통해 얻어지는데, 이때 BeautifulSoup() 객체는 requests 라이브러리를 이용해 보낸 get 요청에 대한 응답과 parser 타입을 인자로 받아 생성됩니다. 예시를 통핸 이해를 원한다면 아래 링크에서 가능합니다.
또한, for문을 통해 get_domain_hyperlinks()의 반환값(링크들)을 순회하며 방문하지 않은 링크를 queue에 추가한뒤 방문처리하여 모든 링크를 방문하도록 합니다. 이러한 과정을 거쳐 crawl 함수는 각 페이지의 내용을 포함하는 텍스트 파일을 url을 제목으로 하여 저장합니다.
2. 데이터 가공 - tiktoken 라이브러리를 데이터 프로세
def remove_newlines(serie): serie = serie.str.replace('\n', ' ') serie = serie.str.replace('\\n', ' ') serie = serie.str.replace(' ', ' ') serie = serie.str.replace(' ', ' ') return serie
remove_newlines(serie)는 python의 Series는 1차원 배열과 같은 자료구조입니다. Series 객체 생성시 따로 인덱스를 할당하지 않는다면 0부터 시작되는데 자세한 사항은 아래 링크에서 확인 가능합니다. 뭐 어쨌든 이함수는 pandas의 Series 관련 객체를 인자로 받은 뒤 줄바꿈, 중복된 띄어쓰기, \ 등을 제거하도록 설계되었습니다.
import pandas as pd # Create a list to store the text files texts=[] # Get all the text files in the text directory for file in os.listdir("text/" + domain + "/"): # Open the file and read the text with open("text/" + domain + "/" + file, "r") as f: text = f.read() # Omit the first 11 lines and the last 4 lines, then replace -, _, and #update with spaces. texts.append((file[11:-4].replace('-',' ').replace('_', ' ').replace('#update',''), text)) # Create a dataframe from the list of texts df = pd.DataFrame(texts, columns = ['fname', 'text']) # Set the text column to be the raw text with the newlines removed df['text'] = df.fname + ". " + remove_newlines(df.text) df.to_csv('processed/scraped.csv') df.head()
앞서 crawl()이 실행 된 뒤 이어서 실행되는 부분입니다. 크롤링후 저장된 .txt 파일을 os.listdir() 함수와 for 문을 통해 순회합니다. 각 순회에서 .txt 파일은 file이란 변수에 저장되며, with open as 구문을 통해 file을 파일명으로하는 파일을 읽어 그 내용을 text 변수에 저장합니다. text/domain/ 디렉토리에 저장된 모든 파일을 (파일명, 텍스트) 꼴로 배열에 저장하며, 이를 바탕으로 pandas DateFrame 객체를 생성합니다. dataframe의 text column에 파일명을 붙이고 기존 내용에 앞서 정의한 remove_newlines() 함수를 적용하는 작업 진행 후, dataframe을 csv 파일로 processed 디렉터리에 저장합니다. 이때 .head()를 사용해 처음 5 행의 데이터를 출력합니다.
import tiktoken # Load the cl100k_base tokenizer which is designed to work with the ada-002 model tokenizer = tiktoken.get_encoding("cl100k_base") df = pd.read_csv('processed/scraped.csv', index_col=0) df.columns = ['title', 'text'] # Tokenize the text and save the number of tokens to a new column df['n_tokens'] = df.text.apply(lambda x: len(tokenizer.encode(x))) # Visualize the distribution of the number of tokens per row using a histogram df.n_tokens.hist()
tiktoken 모듈은 OpenAI 모델에서 사용되기 위해 제작된 BPE 토크나이저입니다. tiktoken.get_encoding("encoding name")을 통해 토크나이저를 로드합니다. 이후 .encode() 함수를 통해 텍스트를 토큰으로, 혹은 그 반대로 변환합니다. 아래 표는 각 모델에 대응하는 encoding name을 나타냅니다.
위의 코드는 tokenizer를 로드한 뒤 앞서 저장한 csv의 text 컬럼에 적용해 각 텍스트를 토큰화해 n_token 컬럼을 생성합니다. 이후 .hist() 함수를 통해 n_tokens에 대한 히스토그램으로 나타냅니다. 아래 링크를 통해 tiktoken에 대한 간략한 정리를 확인 가능합니다.
max_tokens = 500 # Function to split the text into chunks of a maximum number of tokens def split_into_many(text, max_tokens = max_tokens): # Split the text into sentences sentences = text.split('. ') # Get the number of tokens for each sentence n_tokens = [len(tokenizer.encode(" " + sentence)) for sentence in sentences] chunks = [] tokens_so_far = 0 chunk = [] # Loop through the sentences and tokens joined together in a tuple for sentence, token in zip(sentences, n_tokens): # If the number of tokens so far plus the number of tokens in the current sentence is greater # than the max number of tokens, then add the chunk to the list of chunks and reset # the chunk and tokens so far if tokens_so_far + token > max_tokens: chunks.append(". ".join(chunk) + ".") chunk = [] tokens_so_far = 0 # If the number of tokens in the current sentence is greater than the max number of # tokens, go to the next sentence if token > max_tokens: continue # Otherwise, add the sentence to the chunk and add the number of tokens to the total chunk.append(sentence) tokens_so_far += token + 1 return chunks
split_into_many(text, max_token = max_tokens) 는 각 텍스트에 대응하는 토큰이 max_tokens 보다 크지 않도록 나누는 기능을 합니다. 입력받은 텍스트를 split() 함수를 통해 문장별로 나누고 이를 토큰화한 결과를 n_tokens 배열로 저장합니다. 청크는 관계된 텍스트의 그룹이라고 볼 수 있으며 split_into_many()에서는 텍스트를 max_tokens을 기준으로 나눠 chunks에 저장합니다. tokens_so_far, chunk는 chunks에 저장할 데이터를 만들기 위한 처리과정에 사용될 변수입니다.
for 문에서는 각 순회에서 sentence와 그에 대응하는 token을 받습니다. 앞서 저장한 토큰인 tokens_so_far과 현재 문장에 대응하는 토큰인 token의 합이 max_tokens 보다 크다면 앞서 chunk를 join() 함수를 이용해 하나의 데이터로 변환한 뒤 chunks에 추가합니다. 이어서 처리에 사용되는 각 변수(chunk, tokens_so_far)을 초기화하고 여기에 현재 문장과 토큰을 추가합니다. 만약 현재 문장에 대응하는 토큰이 max_tokens모다 크다면 해당 문장에 대한 처리는 생략됩니다. 이러한 과정은 인자로 받은 text로 부터 추출된 모든 문장에 대해 진행된 후 max_tokens를 기준으로 나누어진 문장들을 담은 배열인 chunks를 반환합니다.
shortened = [] # Loop through the dataframe for row in df.iterrows(): # If the text is None, go to the next row if row[1]['text'] is None: continue # If the number of tokens is greater than the max number of tokens, split the text into chunks if row[1]['n_tokens'] > max_tokens: shortened += split_into_many(row[1]['text']) # Otherwise, add the text to the list of shortened texts else: shortened.append( row[1]['text'] ) df = pd.DataFrame(shortened, columns = ['text']) df['n_tokens'] = df.text.apply(lambda x: len(tokenizer.encode(x))) df.n_tokens.hist()
df.iterrows()는 데이터프레임의 각 인덱스와 행을 튜플로 반환합니다. 위 코드에서는 df에 저장된 text를 max_tokens를 기준으로 처리 후 shortened에 저장합니다. df에서 text의 토큰이 max_tokens보다 큰 경우 앞서 정의한 split_into_many()를 적용한 뒤 += 연산자를 이용해 여러 텍스트를 shortened에 추가하고 max_tokens 보다 작은 경우 .append를 이용해 text를 바로 shortened에 추가합니다. 모든 순회를 끝낸 뒤 shortened를 이용해 df를 새롭게 초기화하고 앞서 했던 것과 동일하게 각 텍스트를 토큰화 한 뒤 히스토그램으로 나타냅니다.
3. Embedding - openai 라이브러리를 통한 Embedding
머신러닝의 자연어 처리(NLP) 관점에서 Embedding은 단어, 구를 벡터로 나타내는 것입니다. 즉, 단어나 구의 의미를 세밀한 벡터로 표현해 기계학습 모델에서 사용할 수 있도록 하는 처리입니다.
import openai from openai.embeddings_utils import distances_from_embeddings df['embeddings'] = df.text.apply(lambda x: openai.Embedding.create(input=x, engine='text-embedding-ada-002')['data'][0]['embedding']) df.to_csv('processed/embeddings.csv') df.head()
df에 text 데이터를 "text-embedding-ada-002" 모델을 통해 임베딩한 결과를 csv파일로 저장합니다. 이때, embedding 결과는 문자열 형식으로 저장됩니다.
import pandas as pd import numpy as np from openai.embeddings_utils import distances_from_embeddings, cosine_similarity df=pd.read_csv('processed/embeddings.csv', index_col=0) df['embeddings'] = df['embeddings'].apply(eval).apply(np.array) df.head()
앞서 저장한 embedding.csg파일을 df로 불러온 후, string인 embedding 결과를 eval() 함수를 통해 list로 변환하여 최종적으로는 넘파이 어레이로 변환합니다.
4. Embedding 이용한 Context 생성 및 응답
def create_context( question, df, max_len=1800, size="ada" ): """ Create a context for a question by finding the most similar context from the dataframe """ # Get the embeddings for the question q_embeddings = openai.Embedding.create(input=question, engine='text-embedding-ada-002')['data'][0]['embedding'] # Get the distances from the embeddings df['distances'] = distances_from_embeddings(q_embeddings, df['embeddings'].values, distance_metric='cosine') returns = [] cur_len = 0 # Sort by distance and add the text to the context until the context is too long for i, row in df.sort_values('distances', ascending=True).iterrows(): # Add the length of the text to the current length cur_len += row['n_tokens'] + 4 # If the context is too long, break if cur_len > max_len: break # Else add it to the text that is being returned returns.append(row["text"]) # Return the context return "\n\n###\n\n".join(returns)
create_context(question, df, max_len=1800, size='data')는 question, df, max_len, size를 입력으로 받아 question 가장 비슷한 맥락의 데이터를 찾습니다. q_embeddings(question을 임베딩한 결과를 담은 변수)과 df의 각 embedding 결과를 비교한 결과를 distances에 담은 뒤 이를 오름차순으로 정렬해 가장 작은값을 가진 데이터부터 순회하며 returns에 추가합니다. 이때 각 데이터가 추가될 때 마다 해당 데이터에 대응하는 토큰을 cur_lens에 추가하여 return되는 값의 크기를 제한합니다. cur_len 에 토큰을 누적할 때 4를 추가적으로 더하는 이유는 마침표때문입니다. 함수가 반환하는 값은 returns의 각 요소를 '\n\n###\n\n'으로 join한 결과값입니다.
def answer_question( df, model="text-davinci-003", question="Am I allowed to publish model outputs to Twitter, without a human review?", max_len=1800, size="ada", debug=False, max_tokens=150, stop_sequence=None ): """ Answer a question based on the most similar context from the dataframe texts """ context = create_context( question, df, max_len=max_len, size=size, ) # If debug, print the raw model response if debug: print("Context:\n" + context) print("\n\n") try: # Create a completions using the question and context response = openai.Completion.create( prompt=f"Answer the question based on the context below, and if the question can't be answered based on the context, say \"I don't know\"\n\nContext: {context}\n\n---\n\nQuestion: {question}\nAnswer:", temperature=0, max_tokens=max_tokens, top_p=1, frequency_penalty=0, presence_penalty=0, stop=stop_sequence, model=model, ) return response["choices"][0]["text"].strip() except Exception as e: print(e) return ""
answer_question(df, model, question, max_len, size, debug, max_tokens, stop_sequence)에서는 앞서 정의한 create_context 함수를 통해 주어진 인자의 컨텍스트에 가장 근접한 데이터를 추출합니다.(debug의 경우 이 데이터를 프린트합니다.) 추출한 데이터는 prompt에 적용되어 completion.create() 함수에 전달되며, 위 코드에서는 davinci-003 모델을 사용해 텍스트를 생성합니다. completion.create의 각 인자에 대한 설명은 아래와 같습니다.
- prompt : 모델이 생성할 내용에 대한 컨텍스트를 제공하는 인자입니다. 무엇을 생성할 것인지에 대한 문자열을 받습니다..
- temperature : 택스트 생성시 랜덤성을 결정합니다. 1에 가까울수록 창의적인 답변이 생성됩니다.
- max_tokens : 모델이 생성할 내용의 최대치를 결정합니다. 토큰을 기준으로 하며 1 토큰이 대략 0.75 단어에 대응됩니다.
- top_p : temperature과 비슷하게 모델의 무작위성을 제어합니다.
- frequency_penalty : 모델이 예측을 반복하는 경향을 줄이도록 제어합니다. 이미 생성된 단어의 확률을 줄입니다.
- presence_penalty : 모델이 새로운 예측을 하도록 권장하는 인자입니다. 단어가 예측된 텍스트에 나타난 경우 단어의 확률을 낮춥니다. frequency_penalty와 달리 과거 예측에서 나타난 빈도에 영향을 받지 않습니다.
- stop : 모델이 텍스트 생성을 멈추는 토큰을 특정합니다. 부적절한 컨텐츠 생성을 방지하는데 사용됩니다.
- model / engine : text generater를 어떤 언어모델을 이용해 실행할지 특정합니다.
자세한 사항은 아래 openai document를 통해 확인 가능합니다.
answer_question(df, question="What day is it?", debug=False) # "I don't know." answer_question(df, question="What is our newest embeddings model?") # 'The newest embeddings model is text-embedding-ada-002.'
실행 예시는 위와 같습니다.
반응형'Openai' 카테고리의 다른 글