import requestsimport reimport urllib.requestfrom bs4 import BeautifulSoupfrom collections import dequefrom html.parser import HTMLParserfrom urllib.parse import urlparseimport os# Regex pattern to match a URLHTTP_URL_PATTERN =r'^http[s]*://.+'domain ="openai.com"# <- put your domain to be crawledfull_url ="https://openai.com/"# <- put your domain to be crawled with https or http# Create a class to parse the HTML and get the hyperlinksclassHyperlinkParser(HTMLParser):def__init__(self):super().__init__()# Create a list to store the hyperlinks self.hyperlinks = []# Override the HTMLParser's handle_starttag method to get the hyperlinksdefhandle_starttag(self,tag,attrs): attrs =dict(attrs)# If the tag is an anchor tag and it has an href attribute, add the href attribute to the list of hyperlinksif tag =="a"and"href"in attrs: self.hyperlinks.append(attrs["href"])
下一个函数以URL作为参数,打开URL并读取HTML内容。然后,它返回该页面上找到的所有超链接。
# Function to get the hyperlinks from a URLdefget_hyperlinks(url):# Try to open the URL and read the HTMLtry:# Open the URL and read the HTMLwith urllib.request.urlopen(url)as response:# If the response is not HTML, return an empty listifnot response.info().get('Content-Type').startswith("text/html"):return []# Decode the HTML html = response.read().decode('utf-8')exceptExceptionas e:print(e)return []# Create the HTML Parser and then Parse the HTML to get hyperlinks parser =HyperlinkParser() parser.feed(html)return parser.hyperlinks
# Function to get the hyperlinks from a URL that are within the same domaindefget_domain_hyperlinks(local_domain,url): clean_links = []for link inset(get_hyperlinks(url)): clean_link =None# If the link is a URL, check if it is within the same domainif re.search(HTTP_URL_PATTERN, link):# Parse the URL and check if the domain is the same url_obj =urlparse(link)if url_obj.netloc == local_domain: clean_link = link# If the link is not a URL, check if it is a relative linkelse:if link.startswith("/"): link = link[1:]elif link.startswith("#")or link.startswith("mailto:"):continue clean_link ="https://"+ local_domain +"/"+ linkif clean_link isnotNone:if clean_link.endswith("/"): clean_link = clean_link[:-1] clean_links.append(clean_link)# Return the list of hyperlinks that are within the same domainreturnlist(set(clean_links))
defcrawl(url):# Parse the URL and get the domain local_domain =urlparse(url).netloc# Create a queue to store the URLs to crawl queue =deque([url])# Create a set to store the URLs that have already been seen (no duplicates) seen =set([url])# Create a directory to store the text filesifnot os.path.exists("text/"): os.mkdir("text/")ifnot os.path.exists("text/"+local_domain+"/"): os.mkdir("text/"+ local_domain +"/")# Create a directory to store the csv filesifnot os.path.exists("processed"): os.mkdir("processed")# While the queue is not empty, continue crawlingwhile queue:# Get the next URL from the queue url = queue.pop()print(url)# for debugging and to see the progress# Save text from the url to a <url>.txt filewithopen('text/'+local_domain+'/'+url[8:].replace("/", "_") +".txt", "w", encoding="UTF-8")as f:# Get the text from the URL using BeautifulSoup soup =BeautifulSoup(requests.get(url).text, "html.parser")# Get the text but remove the tags text = soup.get_text()# If the crawler gets to a page that requires JavaScript, it will stop the crawlif ("You need to enable JavaScript to run this app."in text):print("Unable to parse page "+ url +" due to JavaScript being required")# Otherwise, write the text to the file in the text directory f.write(text)# Get the hyperlinks from the URL and add them to the queuefor link inget_domain_hyperlinks(local_domain, url):if link notin seen: queue.append(link) seen.add(link)crawl(full_url)
defremove_newlines(serie): serie = serie.str.replace('\n', ' ') serie = serie.str.replace('\\n', ' ') serie = serie.str.replace(' ', ' ') serie = serie.str.replace(' ', ' ')return serie
import pandas as pd# Create a list to store the text filestexts=[]# Get all the text files in the text directoryfor file in os.listdir("text/"+ domain +"/"):# Open the file and read the textwithopen("text/"+ domain +"/"+ file, "r", encoding="UTF-8")as f: text = f.read()# Omit the first 11 lines and the last 4 lines, then replace -, _, and #update with spaces. texts.append((file[11:-4].replace('-',' ').replace('_', ' ').replace('#update',''), text))# Create a dataframe from the list of textsdf = pd.DataFrame(texts, columns = ['fname', 'text'])# Set the text column to be the raw text with the newlines removeddf['text']= df.fname +". "+remove_newlines(df.text)df.to_csv('processed/scraped.csv')df.head()
import tiktoken# Load the cl100k_base tokenizer which is designed to work with the ada-002 modeltokenizer = tiktoken.get_encoding("cl100k_base")df = pd.read_csv('processed/scraped.csv', index_col=0)df.columns = ['title','text']# Tokenize the text and save the number of tokens to a new columndf['n_tokens']= df.text.apply(lambdax: len(tokenizer.encode(x)))# Visualize the distribution of the number of tokens per row using a histogramdf.n_tokens.hist()
max_tokens =500# Function to split the text into chunks of a maximum number of tokensdefsplit_into_many(text,max_tokens= max_tokens):# Split the text into sentences sentences = text.split('. ')# Get the number of tokens for each sentence n_tokens = [len(tokenizer.encode(" "+ sentence))for sentence in sentences] chunks = [] tokens_so_far =0 chunk = []# Loop through the sentences and tokens joined together in a tuplefor sentence, token inzip(sentences, n_tokens):# If the number of tokens so far plus the number of tokens in the current sentence is greater # than the max number of tokens, then add the chunk to the list of chunks and reset# the chunk and tokens so farif tokens_so_far + token > max_tokens: chunks.append(". ".join(chunk) +".") chunk = [] tokens_so_far =0# If the number of tokens in the current sentence is greater than the max number of # tokens, go to the next sentenceif token > max_tokens:continue# Otherwise, add the sentence to the chunk and add the number of tokens to the total chunk.append(sentence) tokens_so_far += token +1return chunksshortened = []# Loop through the dataframefor row in df.iterrows():# If the text is None, go to the next rowif row[1]['text'] isNone:continue# If the number of tokens is greater than the max number of tokens, split the text into chunksif row[1]['n_tokens'] > max_tokens: shortened +=split_into_many(row[1]['text'])# Otherwise, add the text to the list of shortened textselse: shortened.append( row[1]['text'] )
defcreate_context(question,df,max_len=1800,size="ada"):""" Create a context for a question by finding the most similar context from the dataframe """# Get the embeddings for the question q_embeddings = openai.Embedding.create(input=question, engine='text-embedding-ada-002')['data'][0]['embedding']# Get the distances from the embeddings df['distances']=distances_from_embeddings(q_embeddings, df['embeddings'].values, distance_metric='cosine') returns = [] cur_len =0# Sort by distance and add the text to the context until the context is too longfor i, row in df.sort_values('distances', ascending=True).iterrows():# Add the length of the text to the current length cur_len += row['n_tokens']+4# If the context is too long, breakif cur_len > max_len:break# Else add it to the text that is being returned returns.append(row["text"])# Return the contextreturn"\n\n###\n\n".join(returns)
defanswer_question(df,model="text-davinci-003",question="Am I allowed to publish model outputs to Twitter, without a human review?",max_len=1800,size="ada",debug=False,max_tokens=150,stop_sequence=None):""" Answer a question based on the most similar context from the dataframe texts """ context =create_context( question, df, max_len=max_len, size=size, )# If debug, print the raw model responseif debug:print("Context:\n"+ context)print("\n\n")try:# Create a completions using the question and context response = openai.Completion.create( prompt=f"Answer the question based on the context below, and if the question can't be answered based on the context, say \"I don't know\"\n\nContext: {context}\n\n---\n\nQuestion: {question}\nAnswer:",
temperature=0, max_tokens=max_tokens, top_p=1, frequency_penalty=0, presence_penalty=0, stop=stop_sequence, model=model, )return response["choices"][0]["text"].strip()exceptExceptionas e:print(e)return""
answer_question(df, question="What day is it?", debug=False)answer_question(df, question="What is our newest embeddings model?")answer_question(df, question="What is ChatGPT?")
回答将类似于以下内容:
"I don't know."
'The newest embeddings model is text-embedding-ada-002.'
'ChatGPT is a model trained to interact in a conversational way. It is able to answer followup questions, admit its mistakes, challenge incorrect premises, and reject inappropriate requests.'