Nonciclopedia:Bot/wikipedia.py
# -*- coding: utf-8 -*- """ Library to get and put pages on a MediaWiki. Contents of the library (objects and functions to be used outside) Classes: Page(site, title): A page on a MediaWiki site ImagePage(site, title): An image descriptor Page Site(lang, fam): A MediaWiki site Factory functions: Family(name): Import the named family getSite(lang, fam): Return a Site instance Exceptions: Error: Base class for all exceptions in this module NoUsername: Username is not in user-config.py NoPage: Page does not exist on the wiki NoSuchSite: Site does not exist IsRedirectPage: Page is a redirect page IsNotRedirectPage: Page is not a redirect page LockedPage: Page is locked SectionError: The section specified in the Page title does not exist PageNotSaved: Saving the page has failed EditConflict: PageNotSaved due to edit conflict while uploading SpamfilterError: PageNotSaved due to MediaWiki spam filter LongPageError: PageNotSaved due to length limit ServerError: Got unexpected response from wiki server BadTitle: Server responded with BadTitle UserBlocked: Client's username or IP has been blocked PageNotFound: Page not found in list Objects: get_throttle: Call to limit rate of read-access to wiki put_throttle: Call to limit rate of write-access to wiki Other functions: getall(): Load a group of pages via Special:Export handleArgs(): Process all standard command line arguments (such as -family, -lang, -log and others) translate(xx, dict): dict is a dictionary, giving text depending on language, xx is a language. Returns the text in the most applicable language for the xx: wiki setAction(text): Use 'text' instead of "Wikipedia python library" in edit summaries setUserAgent(text): Sets the string being passed to the HTTP server as the User-agent: header. Defaults to 'Pywikipediabot/1.0'. output(text): Prints the text 'text' in the encoding of the user's console. **Use this instead of "print" statements** input(text): Asks input from the user, printing the text 'text' first. inputChoice: Shows user a list of choices and returns user's selection. showDiff(oldtext, newtext): Prints the differences between oldtext and newtext on the screen Wikitext manipulation functions: each of these takes a unicode string containing wiki text as its first argument, and returns a modified version of the text unless otherwise noted -- replaceExcept: replace all instances of 'old' by 'new', skipping any instances of 'old' within comments and other special text blocks removeDisabledParts: remove text portions exempt from wiki markup isDisabled(text,index): return boolean indicating whether text[index] is within a non-wiki-markup section of text decodeEsperantoX: decode Esperanto text using the x convention. encodeEsperantoX: convert wikitext to the Esperanto x-encoding. sectionencode: encode text for use as a section title in wiki-links. findmarker(text, startwith, append): return a string which is not part of text expandmarker(text, marker, separator): return marker string expanded backwards to include separator occurrences plus whitespace Wikitext manipulation functions for interlanguage links: getLanguageLinks(text,xx): extract interlanguage links from text and return in a dict removeLanguageLinks(text): remove all interlanguage links from text removeLanguageLinksAndSeparator(text, site, marker, separator = ''): remove language links, whitespace, preceeding separators from text replaceLanguageLinks(oldtext, new): remove the language links and replace them with links from a dict like the one returned by getLanguageLinks interwikiFormat(links): convert a dict of interlanguage links to text (using same dict format as getLanguageLinks) interwikiSort(sites, inSite): sorts a list of sites according to interwiki sort preference of inSite. url2link: Convert urlname of a wiki page into interwiki link format. Wikitext manipulation functions for category links: getCategoryLinks(text): return list of Category objects corresponding to links in text removeCategoryLinks(text): remove all category links from text replaceCategoryLinksAndSeparator(text, site, marker, separator = ''): remove language links, whitespace, preceeding separators from text replaceCategoryLinks(oldtext,new): replace the category links in oldtext by those in a list of Category objects replaceCategoryInPlace(text,oldcat,newtitle): replace a single link to oldcat with a link to category given by newtitle categoryFormat(links): return a string containing links to all Categories in a list. Unicode utility functions: UnicodeToAsciiHtml: Convert unicode to a bytestring using HTML entities. url2unicode: Convert url-encoded text to unicode using a site's encoding. unicode2html: Ensure unicode string is encodable; if not, convert it to ASCII for HTML. html2unicode: Replace HTML entities in text with unicode characters. stopme(): Put this on a bot when it is not or not communicating with the Wiki any longer. It will remove the bot from the list of running processes, and thus not slow down other bot threads anymore. """ from __future__ import generators # # (C) Pywikipedia bot team, 2003-2007 # # Distributed under the terms of the MIT license. # __version__ = '$Id: wikipedia.py 7179 2009-08-27 10:39:01Z filnik $' import os, sys import httplib, socket, urllib import traceback import time, threading, Queue import math import re, codecs, difflib, locale try: from hashlib import md5 except ImportError: # Python 2.4 compatibility from md5 import new as md5 import xml.sax, xml.sax.handler import htmlentitydefs import warnings import unicodedata import xmlreader from BeautifulSoup import BeautifulSoup, BeautifulStoneSoup, SoupStrainer import weakref # Set the locale to system default. This will ensure correct string # handling for non-latin characters on Python 2.3.x. For Python 2.4.x it's no # longer needed. locale.setlocale(locale.LC_ALL, '') import config, login, query, version try: set # introduced in Python2.4: faster and future except NameError: from sets import Set as set # Check Unicode support (is this a wide or narrow python build?) # See http://www.python.org/doc/peps/pep-0261/ try: unichr(66365) # a character in th: alphabet, uses 32 bit encoding WIDEBUILD = True except ValueError: WIDEBUILD = False # Local exceptions class Error(Exception): """Wikipedia error""" class NoUsername(Error): """Username is not in user-config.py""" class NoPage(Error): """Page does not exist""" class NoSuchSite(Error): """Site does not exist""" class IsRedirectPage(Error): """Page is a redirect page""" class IsNotRedirectPage(Error): """Page is not a redirect page""" class InvalidTitle(Error): """Invalid page title""" class LockedPage(Error): """Page is locked""" class SectionError(Error): """The section specified by # does not exist""" class PageNotSaved(Error): """Saving the page has failed""" class EditConflict(PageNotSaved): """There has been an edit conflict while uploading the page""" class SpamfilterError(PageNotSaved): """Saving the page has failed because the MediaWiki spam filter detected a blacklisted URL.""" def __init__(self, arg): self.url = arg self.args = arg, class LongPageError(PageNotSaved): """Saving the page has failed because it is too long.""" def __init__(self, arg, arg2): self.length = arg self.limit = arg2, class MaxTriesExceededError(PageNotSaved): """Saving the page has failed because the maximum number of attempts has been reached""" class ServerError(Error): """Got unexpected server response""" class BadTitle(Error): """Server responded with BadTitle.""" # UserBlocked exceptions should in general not be caught. If the bot has # been blocked, the bot operator should address the reason for the block # before continuing. class UserBlocked(Error): """Your username or IP has been blocked""" class PageNotFound(Error): """Page not found in list""" class CaptchaError(Error): """Captcha is asked and config.solve_captcha == False.""" class NoHash(Error): """ The APIs don't return any Hash for the image searched. Really Strange, better to raise an error. """ SaxError = xml.sax._exceptions.SAXParseException # Pre-compile re expressions reNamespace = re.compile("^(.+?) *: *(.*)$") Rwatch = re.compile( r"<input type='hidden' value=\"(.*?)\" name=\"wpEditToken\"") Rwatchlist = re.compile(r"<input tabindex='[\d]+' type='checkbox' " r"name='wpWatchthis' checked='checked'") Rlink = re.compile(r'\[\[(?P<title>[^\]\|\[]*)(\|[^\]]*)?\]\]') resectiondecodeescapes = re.compile(r"\.(?=[0-9a-f]{2})",re.I) resectiondecodeleadingnonalpha = re.compile(r'^x(?=[^a-zA-Z])') class Page(object): """Page: A MediaWiki page Constructor has two required parameters: 1) The wiki Site on which the page resides [note that, if the title is in the form of an interwiki link, the Page object may have a different Site than this] 2) The title of the page as a unicode string Optional parameters: insite - the wiki Site where this link was found (to help decode interwiki links) defaultNamespace - A namespace to use if the link does not contain one Methods available: title : The name of the page, including namespace and section if any urlname : Title, in a form suitable for a URL namespace : The namespace in which the page is found titleWithoutNamespace : Title, with the namespace part removed section : The section of the page (the part of the title after '#', if any) sectionFreeTitle : Title, without the section part aslink : Title in the form [[Title]] or [[lang:Title]] site : The wiki this page is in encoding : The encoding of the page isAutoTitle : Title can be translated using the autoFormat method autoFormat : Auto-format certain dates and other standard format page titles isCategory : True if the page is a category isDisambig (*) : True if the page is a disambiguation page isImage : True if the page is an image isRedirectPage (*) : True if the page is a redirect, false otherwise getRedirectTarget (*) : The page the page redirects to isTalkPage : True if the page is in any "talk" namespace toggleTalkPage : Return the talk page (if this is one, return the non-talk page) get (*) : The text of the page latestRevision (*) : The page's current revision id userName : Last user to edit page isIpEdit : True if last editor was unregistered editTime : Timestamp of the last revision to the page previousRevision (*) : The revision id of the previous version permalink (*) : The url of the permalink of the current version getOldVersion(id) (*) : The text of a previous version of the page getRestrictions : Returns a protection dictionary getVersionHistory : Load the version history information from wiki getVersionHistoryTable: Create a wiki table from the history data fullVersionHistory : Return all past versions including wikitext contributingUsers : Return set of users who have edited page exists (*) : True if the page actually exists, false otherwise isEmpty (*) : True if the page has 4 characters or less content, not counting interwiki and category links interwiki (*) : The interwiki links from the page (list of Pages) categories (*) : The categories the page is in (list of Pages) linkedPages (*) : The normal pages linked from the page (list of Pages) imagelinks (*) : The pictures on the page (list of ImagePages) templates (*) : All templates referenced on the page (list of Pages) templatesWithParams(*): All templates on the page, with list of parameters getReferences : List of pages linking to the page canBeEdited (*) : True if page is unprotected or user has edit privileges botMayEdit (*) : True if bot is allowed to edit page put(newtext) : Saves the page put_async(newtext) : Queues the page to be saved asynchronously move : Move the page to another title delete : Deletes the page (requires being logged in) protect : Protect or unprotect a page (requires sysop status) removeImage : Remove all instances of an image from this page replaceImage : Replace all instances of an image with another loadDeletedRevisions : Load all deleted versions of this page getDeletedRevision : Return a particular deleted revision markDeletedRevision : Mark a version to be undeleted, or not undelete : Undelete past version(s) of the page (*) : This loads the page if it has not been loaded before; permalink might even reload it if it has been loaded before """ def __init__(self, site, title, insite=None, defaultNamespace=0): try: # if _editrestriction is True, it means that the page has been found # to have an edit restriction, but we do not know yet whether the # restriction affects us or not self._editrestriction = False if site is None: site = getSite() elif type(site) is str or type(site) is unicode: site = getSite(site) self._site = site if not insite: insite = site # Convert HTML entities to unicode t = html2unicode(title) # Convert URL-encoded characters to unicode # Sometimes users copy the link to a site from one to another. # Try both the source site and the destination site to decode. try: t = url2unicode(t, site = insite, site2 = site) except UnicodeDecodeError: raise InvalidTitle(u'Bad page title : %s' % t) # Normalize unicode string to a NFC (composed) format to allow # proper string comparisons. According to # http://svn.wikimedia.org/viewvc/mediawiki/branches/REL1_6/phase3/includes/normal/UtfNormal.php?view=markup # the mediawiki code normalizes everything to NFC, not NFKC # (which might result in information loss). t = unicodedata.normalize('NFC', t) # Clean up the name, it can come from anywhere. # Replace underscores by spaces, also multiple spaces and underscores with a single space t = t.replace(u"_", u" ") while u" " in t: t = t.replace(u" ", u" ") # Strip spaces at both ends t = t.strip(u" ") # Remove left-to-right and right-to-left markers. t = t.replace(u'\u200e', '').replace(u'\u200f', '') # leading colon implies main namespace instead of the default if t.startswith(':'): t = t[1:] self._namespace = 0 else: self._namespace = defaultNamespace if not t: raise InvalidTitle(u"Invalid title '%s'" % title ) self._namespace = defaultNamespace # # This code was adapted from Title.php : secureAndSplit() # # Namespace or interwiki prefix while True: m = reNamespace.match(t) if not m: break p = m.group(1) lowerNs = p.lower() ns = self.site().getNamespaceIndex(lowerNs) if ns: t = m.group(2) self._namespace = ns break if lowerNs in self.site().family.langs.keys(): # Interwiki link t = m.group(2) # Redundant interwiki prefix to the local wiki if lowerNs == self.site().lang: if t == '': raise Error("Can't have an empty self-link") else: self._site = getSite(lowerNs, self.site().family.name) # If there's an initial colon after the interwiki, that also # resets the default namespace if t != '' and t[0] == ':': self._namespace = 0 t = t[1:] elif lowerNs in self.site().family.get_known_families(site = self.site()): if self.site().family.get_known_families(site = self.site())[lowerNs] == self.site().family.name: t = m.group(2) else: # This page is from a different family if verbose: output(u"Target link '%s' has different family '%s'" % (title, lowerNs)) if self.site().family.name in ['commons', 'meta']: #When the source wiki is commons or meta, #w:page redirects you to w:en:page otherlang = 'en' else: otherlang = self.site().lang familyName = self.site().family.get_known_families(site = self.site())[lowerNs] if familyName in ['commons', 'meta']: otherlang = familyName try: self._site = getSite(otherlang, familyName) except ValueError: raise NoPage("""\ %s is not a local page on %s, and the %s family is not supported by PyWikipediaBot!""" % (title, self.site(), familyName)) t = m.group(2) else: # If there's no recognized interwiki or namespace, # then let the colon expression be part of the title. break sectionStart = t.find(u'#') if sectionStart > 0: self._section = t[sectionStart+1 : ].lstrip(" ") self._section = sectionencode(self._section, self.site().encoding()) if not self._section: self._section = None t = t[ : sectionStart].rstrip(" ") elif sectionStart == 0: raise InvalidTitle(u"Invalid title starting with a #: '%s'" % t) else: self._section = None if t: if not self.site().nocapitalize: t = t[:1].upper() + t[1:] # reassemble the title from its parts if self._namespace != 0: t = self.site().namespace(self._namespace) + u':' + t if self._section: t += u'#' + self._section self._title = t self.editRestriction = None self.moveRestriction = None self._permalink = None self._userName = None self._ipedit = None self._editTime = '0' self._startTime = '0' # For the Flagged Revisions MediaWiki extension self._revisionId = None self._deletedRevs = None except NoSuchSite: raise except: if verbose: output(u"Exception in Page constructor") output( u"site=%s, title=%s, insite=%s, defaultNamespace=%i" % (site, title, insite, defaultNamespace) ) raise def site(self): """Return the Site object for the wiki on which this Page resides.""" return self._site def encoding(self): """Return the character encoding used on this Page's wiki Site.""" return self._site.encoding() def title(self, underscore = False, savetitle = False, decode=False): """Return the title of this Page, as a Unicode string. If underscore is True, replace all ' ' characters with '_'. If savetitle is True, encode any wiki syntax in the title. If decode is True, decodes the section title """ title = self._title if decode: begin = title.find('#') if begin != -1: anchor = self.section(underscore = underscore, decode = True) title = title[:begin + 1] + anchor if savetitle: # Ensure there's no wiki syntax in the title title = title.replace(u"''", u'%27%27') if underscore: title = title.replace(' ', '_') return title def titleWithoutNamespace(self, underscore=False): """Return title of Page without namespace and without section.""" if self.namespace() == 0: return self.sectionFreeTitle(underscore=underscore) else: return self.sectionFreeTitle(underscore=underscore).split(':', 1)[1] def titleForFilename(self): """ Return the title of the page in a form suitable for a filename on the user's file system. """ result = self.title() # Replace characters that are not possible in file names on some # systems. # Spaces are possible on most systems, but are bad for URLs. for forbiddenChar in ':*?/\\ ': result = result.replace(forbiddenChar, '_') return result def section(self, underscore = False, decode=False): """Return the name of the section this Page refers to. The section is the part of the title following a '#' character, if any. If no section is present, return None. """ section = self._section if section and decode: section = resectiondecodeleadingnonalpha.sub('',section) section = resectiondecodeescapes.sub('%',section) section = url2unicode(section, self._site) if not underscore: section = section.replace('_', ' ') return section def sectionFreeTitle(self, underscore=False): """Return the title of this Page, without the section (if any).""" sectionName = self.section(underscore=underscore) title = self.title(underscore=underscore) if sectionName: return title[:-len(sectionName)-1] else: return title def urlname(self): """Return the Page title encoded for use in an URL.""" title = self.title(underscore = True) encodedTitle = title.encode(self.site().encoding()) return urllib.quote(encodedTitle) def __str__(self): """Return a console representation of the pagelink.""" return self.aslink().encode(config.console_encoding, 'replace') def __repr__(self): """Return a more complete string representation.""" return "%s{%s}" % (self.__class__.__name__, str(self)) def aslink(self, forceInterwiki=False, textlink=False, noInterwiki=False): """Return a string representation in the form of a wikilink. If forceInterwiki is True, return an interwiki link even if it points to the home wiki. If False, return an interwiki link only if needed. If textlink is True, always return a link in text form (that is, interwiki links and internal links to the Category: and Image: namespaces will be preceded by a : character). """ if not noInterwiki and (forceInterwiki or self._site != getSite()): colon = "" if textlink: colon = ":" if self._site.family != getSite().family \ and self._site.family.name != self._site.lang: return u'[[%s%s:%s:%s]]' % (colon, self._site.family.name, self._site.lang, self.title(savetitle=True, decode=True)) else: return u'[[%s%s:%s]]' % (colon, self._site.lang, self.title(savetitle=True, decode=True)) elif textlink and (self.isImage() or self.isCategory()): return u'[[:%s]]' % self.title(savetitle=True, decode=True) else: return u'[[%s]]' % self.title(savetitle=True, decode=True) def autoFormat(self): """Return (dictName, value) if title is in date.autoFormat dictionary. Value can be a year, date, etc., and dictName is 'YearBC', 'Year_December', or another dictionary name. Please note that two entries may have exactly the same autoFormat, but be in two different namespaces, as some sites have categories with the same names. Regular titles return (None, None). """ if not hasattr(self, '_autoFormat'): import date _autoFormat = date.getAutoFormat(self.site().language(), self.titleWithoutNamespace()) return _autoFormat def isAutoTitle(self): """Return True if title of this Page is in the autoFormat dictionary.""" return self.autoFormat()[0] is not None def get(self, force=False, get_redirect=False, throttle=True, sysop=False, change_edit_time=True): """Return the wiki-text of the page. This will retrieve the page from the server if it has not been retrieved yet, or if force is True. This can raise the following exceptions that should be caught by the calling code: NoPage: The page does not exist IsRedirectPage: The page is a redirect. The argument of the exception is the title of the page it redirects to. SectionError: The subject does not exist on a page with a # link If get_redirect is True, return the redirect text and save the target of the redirect, do not raise an exception. If force is True, reload all page attributes, including errors. If change_edit_time is False, do not check this version for changes before saving. This should be used only if the page has been loaded previously. """ # NOTE: The following few NoPage exceptions could already be thrown at # the Page() constructor. They are raised here instead for convenience, # because all scripts are prepared for NoPage exceptions raised by # get(), but not for such raised by the constructor. # \ufffd represents a badly encoded character, the other characters are # disallowed by MediaWiki. for illegalChar in u'#<>[]|{}\n\ufffd': if illegalChar in self.sectionFreeTitle(): if verbose: output(u'Illegal character in %s!' % self.aslink()) raise NoPage('Illegal character in %s!' % self.aslink()) if self.namespace() == -1: raise NoPage('%s is in the Special namespace!' % self.aslink()) if self.site().isInterwikiLink(self.title()): raise NoPage('%s is not a local page on %s!' % (self.aslink(), self.site())) if force: # When forcing, we retry the page no matter what. Old exceptions # and contents do not apply any more. for attr in ['_redirarg', '_getexception', '_contents']: if hasattr(self, attr): delattr(self,attr) else: # Make sure we re-raise an exception we got on an earlier attempt if hasattr(self, '_redirarg') and not get_redirect: raise IsRedirectPage, self._redirarg elif hasattr(self, '_getexception'): if self._getexception == IsRedirectPage and get_redirect: pass else: raise self._getexception # Make sure we did try to get the contents once if not hasattr(self, '_contents'): try: self._contents = self._getEditPage(get_redirect = get_redirect, throttle = throttle, sysop = sysop) hn = self.section() if hn: m = re.search("=+ *%s *=+" % hn, self._contents) if verbose and not m: output(u"WARNING: Section does not exist: %s" % self.aslink(forceInterwiki = True)) # Store any exceptions for later reference except NoPage: self._getexception = NoPage raise except IsRedirectPage, arg: self._getexception = IsRedirectPage self._redirarg = arg if not get_redirect: raise except SectionError: self._getexception = SectionError raise return self._contents def _getEditPage(self, get_redirect=False, throttle=True, sysop=False, oldid=None, change_edit_time=True): """Get the contents of the Page via the edit page. Do not use this directly, use get() instead. Arguments: oldid - Retrieve an old revision (by id), not the current one get_redirect - Get the contents, even if it is a redirect page This method returns the raw wiki text as a unicode string. """ if verbose: output(u'Getting page %s' % self.aslink()) path = self.site().edit_address(self.urlname()) if oldid: path = path + "&oldid="+oldid # Make sure Brion doesn't get angry by waiting if the last time a page # was retrieved was not long enough ago. if throttle: get_throttle() textareaFound = False retry_idle_time = 1 while not textareaFound: text = self.site().getUrl(path, sysop = sysop) if "<title>Wiki does not exist</title>" in text: raise NoSuchSite(u'Wiki %s does not exist yet' % self.site()) # Extract the actual text from the textarea m1 = re.search('<textarea([^>]*)>', text) m2 = re.search('</textarea>', text) if m1 and m2: i1 = m1.end() i2 = m2.start() textareaFound = True else: # search for messages with no "view source" (aren't used in new versions) if self.site().mediawiki_message('whitelistedittitle') in text: raise NoPage(u'Page editing is forbidden for anonymous users.') elif self.site().has_mediawiki_message('nocreatetitle') and self.site().mediawiki_message('nocreatetitle') in text: raise NoPage(self.site(), self.aslink(forceInterwiki = True)) # Bad title elif 'var wgPageName = "Special:Badtitle";' in text \ or self.site().mediawiki_message('badtitle') in text: raise BadTitle('BadTitle: %s' % self) # find out if the username or IP has been blocked elif self.site().isBlocked(): raise UserBlocked(self.site(), self.aslink(forceInterwiki = True)) # If there is no text area and the heading is 'View Source' # but user is not blocked, the page does not exist, and is # locked elif self.site().mediawiki_message('viewsource') in text: raise NoPage(self.site(), self.aslink(forceInterwiki = True)) # Some of the newest versions don't have a "view source" tag for # non-existant pages # Check also the div class because if the language is not english # the bot can not seeing that the page is blocked. elif self.site().mediawiki_message('badaccess') in text or \ "<div class=\"permissions-errors\">" in text: raise NoPage(self.site(), self.aslink(forceInterwiki = True)) elif config.retry_on_fail: if "<title>Wikimedia Error</title>" in text: output( u"Wikimedia has technical problems; will retry in %i minutes." % retry_idle_time) else: output( unicode(text) ) # We assume that the server is down. Wait some time, then try again. output( u"WARNING: No text area found on %s%s. Maybe the server is down. Retrying in %i minutes..." % (self.site().hostname(), path, retry_idle_time) ) time.sleep(retry_idle_time * 60) # Next time wait longer, but not longer than half an hour retry_idle_time *= 2 if retry_idle_time > 30: retry_idle_time = 30 else: output( u"Failed to access wiki") sys.exit(1) # Check for restrictions m = re.search('var wgRestrictionEdit = \\["(\w+)"\\]', text) if m: if verbose: output(u"DBG> page is locked for group %s" % m.group(1)) self.editRestriction = m.group(1); else: self.editRestriction = '' m = re.search('var wgRestrictionMove = \\["(\w+)"\\]', text) if m: self.moveRestriction = m.group(1); else: self.moveRestriction = '' m = re.search('name=["\']baseRevId["\'] type=["\']hidden["\'] value="(\d+)"', text) if m: self._revisionId = m.group(1) if change_edit_time: # Get timestamps m = re.search('value="(\d+)" name=["\']wpEdittime["\']', text) if m: self._editTime = m.group(1) else: self._editTime = "0" m = re.search('value="(\d+)" name=["\']wpStarttime["\']', text) if m: self._startTime = m.group(1) else: self._startTime = "0" # Find out if page actually exists. Only existing pages have a # version history tab. if self.site().family.RversionTab(self.site().language()): # In case a family does not have version history tabs, or in # another form RversionTab = re.compile(self.site().family.RversionTab(self.site().language())) else: RversionTab = re.compile(r'<li id="ca-history"><a href=".*?title=.*?&action=history".*?>.*?</a></li>', re.DOTALL) matchVersionTab = RversionTab.search(text) if not matchVersionTab: raise NoPage(self.site(), self.aslink(forceInterwiki = True),"Page does not exist. In rare cases, if you are certain the page does exist, look into overriding family.RversionTab" ) # Look if the page is on our watchlist matchWatching = Rwatchlist.search(text) if matchWatching: self._isWatched = True else: self._isWatched = False # Now process the contents of the textarea # Unescape HTML characters, strip whitespace pagetext = text[i1:i2] pagetext = unescape(pagetext) pagetext = pagetext.rstrip() if self.site().lang == 'eo': pagetext = decodeEsperantoX(pagetext) m = self.site().redirectRegex().match(pagetext) if m: # page text matches the redirect pattern if self.section() and not "#" in m.group(1): redirtarget = "%s#%s" % (m.group(1), self.section()) else: redirtarget = m.group(1) if get_redirect: self._redirarg = redirtarget else: raise IsRedirectPage(redirtarget) if self.section(): # TODO: What the hell is this? Docu please. m = re.search("\.3D\_*(\.27\.27+)?(\.5B\.5B)?\_*%s\_*(\.5B\.5B)?(\.27\.27+)?\_*\.3D" % re.escape(self.section()), sectionencode(text,self.site().encoding())) if not m: try: self._getexception except AttributeError: raise SectionError # Page has no section by this name return pagetext def getOldVersion(self, oldid, force=False, get_redirect=False, throttle=True, sysop=False, change_edit_time=True): """Return text of an old revision of this page; same options as get().""" # TODO: should probably check for bad pagename, NoPage, and other # exceptions that would prevent retrieving text, as get() does # TODO: should this default to change_edit_time = False? If we're not # getting the current version, why change the timestamps? return self._getEditPage( get_redirect=get_redirect, throttle=throttle, sysop=sysop, oldid=oldid, change_edit_time=change_edit_time ) def permalink(self): """Return the permalink URL for current revision of this page.""" return "%s://%s%s&oldid=%i" % (self.site().protocol(), self.site().hostname(), self.site().get_address(self.title()), self.latestRevision()) def latestRevision(self): """Return the latest revision id for this page.""" if not self._permalink: # When we get the page with getall, the permalink is received # automatically getall(self.site(),[self],force=True) # Check for exceptions if hasattr(self, '_getexception'): raise self._getexception return int(self._permalink) def previousRevision(self): """Return the revision id for the previous revision of this Page.""" vh = self.getVersionHistory(revCount=2) return vh[1][0] def exists(self): """Return True if page exists on the wiki, even if it's a redirect. If the title includes a section, return False if this section isn't found. """ try: self.get() except NoPage: return False except IsRedirectPage: return True except SectionError: return False return True def pageAPInfo(self): """Return the last revid if page exists on the wiki, Raise IsRedirectPage if it's a redirect Raise NoPage if the page doesn't exist Using the API should be a lot faster. Function done in order to improve the scripts performance. """ params = { 'action' :'query', 'prop' :'info', 'titles' :self.title(), } data = query.GetData(params, self.site(), encodeTitle = False) pageid = data['query']['pages'].keys()[0] if data['query']['pages'][pageid].keys()[0] == 'lastrevid': return data['query']['pages'][pageid]['lastrevid'] # if ok, # return the last revid elif data['query']['pages'][pageid].keys()[0] == 'redirect': raise IsRedirectPage else: # should not exists, OR we have problems. # better double check in this situations x = self.get() return True # if we reach this point, we had no problems. def getTemplates(self, tllimit = 5000): #action=query&prop=templates&titles=Main Page """ Returns the templates that are used in the page given. It works through the APIs. If no templates found, returns None. Note: It returns "only" the first 5000 templates, if there are more, they won't be returned, sorry. """ params = { 'action' :'query', 'prop' :'templates', 'titles' :self.title(), 'tllimit' :tllimit, } data = query.GetData(params, self.site(), encodeTitle = False) try: pageid = data['query']['pages'].keys()[0] except KeyError: if tllimit != 500: return self.getTemplates(500) else: raise Error(data) try: templates = data['query']['pages'][pageid]['templates'] except KeyError: return list() templatesFound = list() for template in templates: templateName = template['title'] templatesFound.append(Page(self.site(), templateName)) return templatesFound def isRedirectPage(self): """Return True if this is a redirect, False if not or not existing.""" try: self.get() except NoPage: return False except IsRedirectPage: return True except SectionError: return False return False def isEmpty(self): """Return True if the page text has less than 4 characters. Character count ignores language links and category links. Can raise the same exceptions as get(). """ txt = self.get() txt = removeLanguageLinks(txt, site = self.site()) txt = removeCategoryLinks(txt, site = self.site()) if len(txt) < 4: return True else: return False def isTalkPage(self): """Return True if this page is in any talk namespace.""" ns = self.namespace() return ns >= 0 and ns % 2 == 1 def botMayEdit(self, username): """Return True if this page allows bots to edit it. This will be True if the page doesn't contain {{bots}} or {{nobots}}, or it contains them and the active bot is allowed to edit this page. (This method is only useful on those sites that recognize the bot-exclusion protocol; on other sites, it will always return True.) The framework enforces this restriction by default. It is possible to override this by setting ignore_bot_templates=True in user_config.py, or using page.put(force=True). """ if config.ignore_bot_templates: #Check the "master ignore switch" return True try: templates = self.templatesWithParams(get_redirect=True); except (NoPage, IsRedirectPage, SectionError): return True for template in templates: if template[0] == 'Nobots': return False elif template[0] == 'Bots': if len(template[1]) == 0: return True else: (type, bots) = template[1][0].split('=', 1) bots = bots.split(',') if type == 'allow': if 'all' in bots or username in bots: return True else: return False if type == 'deny': if 'all' in bots or username in bots: return False else: return True # no restricting template found return True def userName(self): """Return name or IP address of last user to edit page. Returns None unless page was retrieved with getAll(). """ return self._userName def isIpEdit(self): """Return True if last editor was unregistered. Returns None unless page was retrieved with getAll(). """ return self._ipedit def editTime(self): """Return timestamp (in MediaWiki format) of last revision to page. Returns None if last edit time is unknown. """ return self._editTime def namespace(self): """Return the number of the namespace of the page. Only recognizes those namespaces defined in family.py. If not defined, it will return 0 (the main namespace). """ return self._namespace def isCategory(self): """Return True if the page is a Category, False otherwise.""" return self.namespace() == 14 def isImage(self): """Return True if this is an image description page, False otherwise.""" return self.namespace() == 6 def isCategoryRedirect(self, text=None): """Return True if this is a category redirect page, False otherwise.""" if not self.isCategory(): return False if not hasattr(self, "_catredirect"): if not text: text = self.get(get_redirect=True) catredirs = self.site().category_redirects() for (t, args) in self.templatesWithParams(thistxt=text): template = Page(self.site(), t, defaultNamespace=10 ).titleWithoutNamespace() # normalize title if template in catredirs: # Get target (first template argument) self._catredirect = self.site().namespace(14) + ":" + args[0] break else: self._catredirect = False return bool(self._catredirect) def getCategoryRedirectTarget(self): """If this is a category redirect, return the target category title.""" if self.isCategoryRedirect(): import catlib return catlib.Category(self.site(), self._catredirect) raise IsNotRedirectPage def isDisambig(self): """Return True if this is a disambiguation page, False otherwise. Relies on the presence of specific templates, identified in the Family file or on a wiki page, to identify disambiguation pages. By default, loads a list of template names from the Family file; if the value in the Family file is None, looks for the list on [[MediaWiki:Disambiguationspage]]. """ if not hasattr(self, "_isDisambig"): if not hasattr(self._site, "_disambigtemplates"): distl = self._site.family.disambig(self._site.lang) if distl is None: try: disambigpages = Page(self._site, "MediaWiki:Disambiguationspage") self._site._disambigtemplates = set( link.titleWithoutNamespace() for link in disambigpages.linkedPages() if link.namespace() == 10 ) except NoPage: self._site._disambigtemplates = set(['Disambig']) else: # Normalize template capitalization self._site._disambigtemplates = set( t[:1].upper() + t[1:] for t in distl ) disambigInPage = self._site._disambigtemplates.intersection(self.templates()) self._isDisambig = len(disambigInPage) > 0 return self._isDisambig def getReferences(self, follow_redirects=True, withTemplateInclusion=True, onlyTemplateInclusion=False, redirectsOnly=False): """Yield all pages that link to the page. If you need a full list of referring pages, use this: pages = [page for page in s.getReferences()] Parameters: * follow_redirects - if True, also returns pages that link to a redirect pointing to the page. * withTemplateInclusion - if True, also returns pages where self is used as a template. * onlyTemplateInclusion - if True, only returns pages where self is used as a template. * redirectsOnly - if True, only returns redirects to self. """ # Temporary bug-fix while researching more robust solution: if config.special_page_limit > 999: config.special_page_limit = 999 site = self.site() path = self.site().references_address(self.urlname()) content = SoupStrainer("div", id=self.site().family.content_id) try: next_msg = self.site().mediawiki_message('whatlinkshere-next') except KeyError: next_msg = "next %i" % config.special_page_limit plural = (config.special_page_limit == 1) and "\\1" or "\\2" next_msg = re.sub(r"{{PLURAL:\$1\|(.*?)\|(.*?)}}", plural, next_msg) nextpattern = re.compile("^%s$" % next_msg.replace("$1", "[0-9]+")) delay = 1 if self.site().has_mediawiki_message("Isredirect"): self._isredirectmessage = self.site().mediawiki_message("Isredirect") if self.site().has_mediawiki_message("Istemplate"): self._istemplatemessage = self.site().mediawiki_message("Istemplate") # to avoid duplicates: refPages = set() while path: output(u'Getting references to %s' % self.aslink()) get_throttle() txt = self.site().getUrl(path) body = BeautifulSoup(txt, convertEntities=BeautifulSoup.HTML_ENTITIES, parseOnlyThese=content) next_text = body.find(text=nextpattern) if next_text is not None and next_text.parent.has_key('href'): path = next_text.parent['href'].replace("&", "&") else: path = "" reflist = body.find("ul") if reflist is None: return for page in self._parse_reflist(reflist, follow_redirects, withTemplateInclusion, onlyTemplateInclusion, redirectsOnly): if page not in refPages: yield page refPages.add(page) def _parse_reflist(self, reflist, follow_redirects=True, withTemplateInclusion=True, onlyTemplateInclusion=False, redirectsOnly=False): """For internal use only Parse a "Special:Whatlinkshere" list of references and yield Page objects that meet the criteria (used by getReferences) """ for link in reflist("li", recursive=False): title = link.a.string if title is None: output(u"DBG> invalid <li> item in Whatlinkshere: %s" % link) try: p = Page(self.site(), title) except InvalidTitle: output(u"DBG> Whatlinkshere:%s contains invalid link to %s" % (self.title(), title)) continue isredirect, istemplate = False, False textafter = link.a.findNextSibling(text=True) if textafter is not None: if self.site().has_mediawiki_message("Isredirect") \ and self._isredirectmessage in textafter: # make sure this is really a redirect to this page # (MediaWiki will mark as a redirect any link that follows # a #REDIRECT marker, not just the first one). if p.getRedirectTarget().sectionFreeTitle() == self.sectionFreeTitle(): isredirect = True if self.site().has_mediawiki_message("Istemplate") \ and self._istemplatemessage in textafter: istemplate = True if (withTemplateInclusion or onlyTemplateInclusion or not istemplate ) and (not redirectsOnly or isredirect ) and (not onlyTemplateInclusion or istemplate ): yield p continue if isredirect and follow_redirects: sublist = link.find("ul") if sublist is not None: for p in self._parse_reflist(sublist, follow_redirects, withTemplateInclusion, onlyTemplateInclusion, redirectsOnly): yield p def _getActionUser(self, action, restriction = '', sysop = False): """ Get the user to do an action: sysop or not sysop, or raise an exception if the user cannot do that. Parameters: * action - the action done, which is the name of the right * restriction - the restriction level or an empty string for no restriction * sysop - initially use sysop user? """ # Login self.site().forceLogin(sysop = sysop) # Check permissions if not self.site().isAllowed(action, sysop): if sysop: raise LockedPage(u'The sysop user is not allowed to %s in site %s' % (action, self.site())) else: try: user = self._getActionUser(action, restriction, sysop = True) output(u'The user is not allowed to %s on site %s. Using sysop account.' % (action, self.site())) return user except NoUsername: raise LockedPage(u'The user is not allowed to %s on site %s, and no sysop account is defined.' % (action, self.site())) except LockedPage: raise # Check restrictions if not self.site().isAllowed(restriction, sysop): if sysop: raise LockedPage(u'Page on %s is locked in a way that sysop user cannot %s it' % (self.site(), action)) else: try: user = self._getActionUser(action, restriction, sysop = True) output(u'Page is locked on %s - cannot %s, using sysop account.' % (self.site(), action)) return user except NoUsername: raise LockedPage(u'Page is locked on %s - cannot %s, and no sysop account is defined.' % (self.site(), action)) except LockedPage: raise return sysop def getRestrictions(self): """ Get the protections on the page. * Returns a restrictions dictionary. Keys are 'edit' and 'move', Values are None (no restriction for that action) or [level, expiry] : * level is the level of auth needed to perform that action ('autoconfirmed' or 'sysop') * expiry is the expiration time of the restriction """ #, titles = None #if titles: # restrictions = {} #else: restrictions = { 'edit': None, 'move': None } try: api_url = self.site().api_address() except NotImplementedError: return restrictions predata = { 'action': 'query', 'prop': 'info', 'inprop': 'protection', 'titles': self.title(), } #if titles: # predata['titles'] = query.ListToParam(titles) text = query.GetData(predata, self.site())['query']['pages'] for pageid in text: if text[pageid].has_key('missing'): self._getexception = NoPage raise NoPage('Page %s does not exist' % self.aslink()) elif not text[pageid].has_key('pageid'): # Don't know what may happen here. # We may want to have better error handling raise Error("BUG> API problem.") if text[pageid]['protection'] != []: #if titles: # restrictions = dict([ detail['type'], [ detail['level'], detail['expiry'] ] ] # for detail in text[pageid]['protection']) #else: restrictions = dict([ detail['type'], [ detail['level'], detail['expiry'] ] ] for detail in text[pageid]['protection']) return restrictions def put_async(self, newtext, comment=None, watchArticle=None, minorEdit=True, force=False, callback=None): """Put page on queue to be saved to wiki asynchronously. Asynchronous version of put (takes the same arguments), which places pages on a queue to be saved by a daemon thread. All arguments are the same as for .put(), except -- callback: a callable object that will be called after the page put operation; this object must take two arguments: (1) a Page object, and (2) an exception instance, which will be None if the page was saved successfully. The callback is intended to be used by bots that need to keep track of which saves were successful. """ try: page_put_queue.mutex.acquire() try: _putthread.start() except (AssertionError, RuntimeError): pass finally: page_put_queue.mutex.release() page_put_queue.put((self, newtext, comment, watchArticle, minorEdit, force, callback)) def put(self, newtext, comment=None, watchArticle=None, minorEdit=True, force=False, sysop=False, botflag=True, maxTries=-1): """Save the page with the contents of the first argument as the text. Optional parameters: comment: a unicode string that is to be used as the summary for the modification. watchArticle: a bool, add or remove this Page to/from bot user's watchlist (if None, leave watchlist status unchanged) minorEdit: mark this edit as minor if True force: ignore botMayEdit() setting. maxTries: the maximum amount of save attempts. -1 for infinite. """ # Login try: self.get() except: pass sysop = self._getActionUser(action = 'edit', restriction = self.editRestriction, sysop = sysop) username = self.site().loggedInAs() # Check blocks self.site().checkBlocks(sysop = sysop) # Determine if we are allowed to edit if not force: if not self.botMayEdit(username): raise LockedPage(u'Not allowed to edit %s because of a restricting template' % self.aslink()) # If there is an unchecked edit restriction, we need to load the page if self._editrestriction: output(u'Page %s is semi-protected. Getting edit page to find out if we are allowed to edit.' % self.aslink()) self.get(force = True, change_edit_time = False) self._editrestriction = False # If no comment is given for the change, use the default comment = comment or action if config.cosmetic_changes and not self.isTalkPage(): old = newtext if not config.cosmetic_changes_mylang_only or (self.site().family.name == config.family and self.site().lang == config.mylang): import cosmetic_changes ccToolkit = cosmetic_changes.CosmeticChangesToolkit(self.site()) newtext = ccToolkit.change(newtext) if comment and old.strip().replace('\r\n', '\n') != newtext.strip().replace('\r\n', '\n'): comment += translate(self.site(), cosmetic_changes.msg_append) if watchArticle is None: # if the page was loaded via get(), we know its status if hasattr(self, '_isWatched'): watchArticle = self._isWatched else: import watchlist watchArticle = watchlist.isWatched(self.title(), site = self.site()) newPage = not self.exists() # if posting to an Esperanto wiki, we must e.g. write Bordeauxx instead # of Bordeaux if self.site().lang == 'eo': newtext = encodeEsperantoX(newtext) comment = encodeEsperantoX(comment) return self._putPage(newtext, comment, watchArticle, minorEdit, newPage, self.site().getToken(sysop = sysop), sysop = sysop, botflag=botflag, maxTries=maxTries) def _encodeArg(self, arg, msgForError): """Encode an ascii string/Unicode string to the site's encoding""" try: return arg.encode(self.site().encoding()) except UnicodeDecodeError, e: # happens when arg is a non-ascii bytestring : # when reencoding bytestrings, python decodes first to ascii e.reason += ' (cannot convert input %s string to unicode)' % msgForError raise e except UnicodeEncodeError, e: # happens when arg is unicode e.reason += ' (cannot convert %s to wiki encoding %s)' % (msgForError, self.site().encoding()) raise e def _putPage(self, text, comment=None, watchArticle=False, minorEdit=True, newPage=False, token=None, newToken=False, sysop=False, captcha=None, botflag=True, maxTries=-1): """Upload 'text' as new content of Page by API Don't use this directly, use put() instead. """ try: if config.use_api and self.site().versionnumber() >= 13: apitest = self.site().api_address() del apitest else: raise NotImplementedError #No enable api or version not support except NotImplementedError: return self._putPageOld(text, comment, watchArticle, minorEdit, newPage, token, newToken, sysop, captcha, botflag, maxTries) retry_attempt = 1 retry_delay = 1 dblagged = False params = { 'action': 'edit', 'title': self.title(), 'text': self._encodeArg(text, 'text'), 'summary': self._encodeArg(comment, 'summary'), } if token: params['token'] = token else: params['token'] = self.site().getToken(sysop = sysop) # Add server lag parameter (see config.py for details) if config.maxlag: params['maxlag'] = str(config.maxlag) if self._editTime: params['basetimestamp'] = self._editTime else: params['basetimestamp'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) if self._startTime: params['starttimestamp'] = self._startTime else: params['starttimestamp'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) if botflag: params['bot'] = 1 if minorEdit: params['minor'] = 1 else: params['notminor'] = 1 if watchArticle: params['watch'] = 1 #else: # params['unwatch'] = 1 if captcha: params['captchaid'] = captcha['id'] params['captchaword'] = captcha['answer'] while True: if (maxTries == 0): raise MaxTriesExceededError() maxTries -= 1 # Check whether we are not too quickly after the previous # putPage, and wait a bit until the interval is acceptable if not dblagged: put_throttle() # Which web-site host are we submitting to? if newPage: output(u'Creating page %s' % self.aslink()) params['createonly'] = 1 else: output(u'Updating page %s' % self.aslink()) params['nocreate'] = 1 # Submit the prepared information try: response, data = query.GetData(params, self.site(), sysop=sysop, back_response = True) if query.IsString(data): raise KeyError except httplib.BadStatusLine, line: raise PageNotSaved('Bad status line: %s' % line.line) except ServerError: output(u''.join(traceback.format_exception(*sys.exc_info()))) retry_attempt += 1 if retry_attempt > config.maxretries: raise output(u'Got a server error when putting %s; will retry in %i minute%s.' % (self.aslink(), retry_delay, retry_delay != 1 and "s" or "")) time.sleep(60 * retry_delay) retry_delay *= 2 if retry_delay > 30: retry_delay = 30 continue except ValueError: # API result cannot decode output(u"Server error encountered; will retry in %i minute%s." % (retry_delay, retry_delay != 1 and "s" or "")) time.sleep(60 * retry_delay) retry_delay *= 2 if retry_delay > 30: retry_delay = 30 continue # If it has gotten this far then we should reset dblagged dblagged = False # Check blocks self.site().checkBlocks(sysop = sysop) # A second text area means that an edit conflict has occured. if response.status == 500: output(u"Server error encountered; will retry in %i minute%s." % (retry_delay, retry_delay != 1 and "s" or "")) time.sleep(60 * retry_delay) retry_delay *= 2 if retry_delay > 30: retry_delay = 30 continue if data.has_key('error'): #All available error key in edit mode: (from ApiBase.php) # 'noimageredirect-anon':"Anonymous users can't create image redirects", # 'noimageredirect':"You don't have permission to create image redirects", # 'filtered':"The filter callback function refused your edit", # 'noedit-anon':"Anonymous users can't edit pages", # 'noedit':"You don't have permission to edit pages", # 'emptypage':"Creating new, empty pages is not allowed", # 'badmd5':"The supplied MD5 hash was incorrect", # 'notext':"One of the text, appendtext, prependtext and undo parameters must be set", # 'emptynewsection':'Creating empty new sections is not possible.', # 'revwrongpage':"r\$1 is not a revision of ``\$2''", # 'undofailure':'Undo failed due to conflicting intermediate edits', #for debug only #------------------------ if verbose: output("error occured, result:%s\nstatus:%s\nresponse:%s" % (data, response.status, response.reason)) faked = params del faked['text'], faked['format'] output("OriginalData:%s" % faked) del faked #------------------------ errorCode = data['error']['code'] #cannot handle longpageerror and PageNoSave yet if errorCode == 'maxlag' or response.status == 503: # server lag; Mediawiki recommends waiting 5 seconds # and retrying if verbose: output(data, newline=False) output(u"Pausing 5 seconds due to database server lag.") dblagged = True time.sleep(5) continue elif errorCode == 'editconflict': # 'editconflict':"Edit conflict detected", raise EditConflict(u'An edit conflict has occured.') elif errorCode == 'spamdetected': # 'spamdetected':"Your edit was refused because it contained a spam fragment: ``\$1''", raise SpamfilterError(data['error']['info'][62:-2]) elif errorCode == 'pagedeleted': # 'pagedeleted':"The page has been deleted since you fetched its timestamp", # Make sure your system clock is correct if this error occurs # without any reason! # raise EditConflict(u'Someone deleted the page.') # No raise, simply define these variables and retry: params['recreate'] = 1 if self._editTime: params['basetimestamp'] = self._editTime else: params['basetimestamp'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) if self._startTime: params['starttimestamp'] = self._startTime else: params['starttimestamp'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) continue elif errorCode == 'readonly': # 'readonly':"The wiki is currently in read-only mode" output(u"The database is currently locked for write access; will retry in %i minute%s." % (retry_delay, retry_delay != 1 and "s" or "")) time.sleep(60 * retry_delay) retry_delay *= 2 if retry_delay > 30: retry_delay = 30 continue elif errorCode == 'contenttoobig': # 'contenttoobig':"The content you supplied exceeds the article size limit of \$1 kilobytes", raise LongPageError(len(params['text']), int(data['error']['info'][59:-10])) elif errorCode in ['protectedpage', 'customcssjsprotected', 'cascadeprotected', 'protectednamespace', 'protectednamespace-interface']: # 'protectedpage':"The ``\$1'' right is required to edit this page" # 'cascadeprotected':"The page you're trying to edit is protected because it's included in a cascade-protected page" # 'customcssjsprotected': "You're not allowed to edit custom CSS and JavaScript pages" # 'protectednamespace': "You're not allowed to edit pages in the ``\$1'' namespace" # 'protectednamespace-interface':"You're not allowed to edit interface messages" # # The page is locked. This should have already been # detected when getting the page, but there are some # reasons why this didn't work, e.g. the page might be # locked via a cascade lock. try: # Page is locked - try using the sysop account, unless we're using one already if sysop:# Unknown permissions error raise LockedPage() else: self.site().forceLogin(sysop = True) output(u'Page is locked, retrying using sysop account.') return self._putPage(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = True), sysop = True) except NoUsername: raise LockedPage() elif errorCode == 'badtoken': if newToken: output(u"Edit token has failed. Giving up.") else: # We might have been using an outdated token output(u"Edit token has failed. Retrying.") return self._putPage(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = sysop, getagain = True), newToken = True, sysop = sysop) # I think the error message title was changed from "Wikimedia Error" # to "Wikipedia has a problem", but I'm not sure. Maybe we could # just check for HTTP Status 500 (Internal Server Error)? else: output("Unknown Error. API Error code:%s" % data['error']['code'] ) output("Information:%s" %data['error']['info']) else: if data['edit']['result'] == u"Success": # # The status code for update page completed in ordinary mode is 302 - Found # But API is always 200 - OK because it only send "success" back in string. # if the page update is successed, we need to return code 302 for cheat script who # using status code # return 302, response.reason, data solve = self.site().solveCaptcha(data) if solve: return self._putPage(text, comment, watchArticle, minorEdit, newPage, token, newToken, sysop, captcha=solve) return response.status, response.reason, data def _putPageOld(self, text, comment=None, watchArticle=False, minorEdit=True, newPage=False, token=None, newToken=False, sysop=False, captcha=None, botflag=True, maxTries=-1): """Upload 'text' as new content of Page by filling out the edit form. Don't use this directly, use put() instead. """ host = self.site().hostname() # Get the address of the page on that host. address = self.site().put_address(self.urlname()) predata = { 'wpSave': '1', 'wpSummary': self._encodeArg(comment, 'edit summary'), 'wpTextbox1': self._encodeArg(text, 'wikitext'), # As of October 2008, MW HEAD requires wpSection to be set. # We will need to fill this more smartly if we ever decide to edit by section 'wpSection': '', } if not botflag: predata['bot']='0' if captcha: predata["wpCaptchaId"] = captcha['id'] predata["wpCaptchaWord"] = captcha['answer'] # Add server lag parameter (see config.py for details) if config.maxlag: predata['maxlag'] = str(config.maxlag) # {{s|Except if the page is new, we need to supply the time of the # previous version to the wiki to prevent edit collisions}} # As of Oct 2008, these must be filled also for new pages if self._editTime: predata['wpEdittime'] = self._editTime else: predata['wpEdittime'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) if self._startTime: predata['wpStarttime'] = self._startTime else: predata['wpStarttime'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) if self._revisionId: predata['baseRevId'] = self._revisionId # Pass the minorEdit and watchArticle arguments to the Wiki. if minorEdit: predata['wpMinoredit'] = '1' if watchArticle: predata['wpWatchthis'] = '1' # Give the token, but only if one is supplied. if token: ##output(token) # for debug use only predata['wpEditToken'] = token # Sorry, single-site exception... if self.site().fam().name == 'loveto' and self.site().language() == 'recipes': predata['masteredit'] = '1' retry_delay = 1 retry_attempt = 1 dblagged = False while True: if (maxTries == 0): raise MaxTriesExceededError() maxTries -= 1 # Check whether we are not too quickly after the previous # putPage, and wait a bit until the interval is acceptable if not dblagged: put_throttle() # Which web-site host are we submitting to? if newPage: output(u'Creating page %s' % self.aslink()) else: output(u'Changing page %s' % self.aslink()) # Submit the prepared information if self.site().hostname() in config.authenticate.keys(): predata["Content-type"] = "application/x-www-form-urlencoded" predata["User-agent"] = useragent data = self.site().urlEncode(predata) response = urllib2.urlopen(urllib2.Request(self.site().protocol() + '://' + self.site().hostname() + address, data)) # I'm not sure what to check in this case, so I just assume # things went ok. Very naive, I agree. data = u'' # No idea how to get the info now. return None try: response, data = self.site().postForm(address, predata, sysop) if response.status == 503: if 'x-database-lag' in response.msg.keys(): # server lag; Mediawiki recommends waiting 5 seconds # and retrying if verbose: output(data, newline=False) output(u"Pausing 5 seconds due to database server lag.") dblagged = True time.sleep(5) continue # Squid error 503 raise ServerError(response.status) except httplib.BadStatusLine, line: raise PageNotSaved('Bad status line: %s' % line.line) except ServerError: output(u''.join(traceback.format_exception(*sys.exc_info()))) retry_attempt += 1 if retry_attempt > config.maxretries: raise output( u'Got a server error when putting %s; will retry in %i minute%s.' % (self.aslink(), retry_delay, retry_delay != 1 and "s" or "")) time.sleep(60 * retry_delay) retry_delay *= 2 if retry_delay > 30: retry_delay = 30 continue # If it has gotten this far then we should reset dblagged dblagged = False # Check blocks self.site().checkBlocks(sysop = sysop) # A second text area means that an edit conflict has occured. editconflict = re.compile('id=["\']wpTextbox2[\'"] name="wpTextbox2"') if editconflict.search(data): raise EditConflict(u'An edit conflict has occured.') # remove the wpAntispam keyword before checking for Spamfilter data = re.sub(u'(?s)<label for="wpAntispam">.*?</label>', '', data) if self.site().has_mediawiki_message("spamprotectiontitle")\ and self.site().mediawiki_message('spamprotectiontitle') in data: try: reasonR = re.compile(re.escape(self.site().mediawiki_message('spamprotectionmatch')).replace('\$1', '(?P<url>[^<]*)')) url = reasonR.search(data).group('url') except: # Some wikis have modified the spamprotectionmatch # template in a way that the above regex doesn't work, # e.g. on he.wikipedia the template includes a # wikilink, and on fr.wikipedia there is bold text. # This is a workaround for this: it takes the region # which should contain the spamfilter report and the # URL. It then searches for a plaintext URL. relevant = data[data.find('<!-- start content -->')+22:data.find('<!-- end content -->')].strip() # Throw away all the other links etc. relevant = re.sub('<.*?>', '', relevant) relevant = relevant.replace(':', ':') # MediaWiki only spam-checks HTTP links, and only the # domain name part of the URL. m = re.search('http://[\w\-\.]+', relevant) if m: url = m.group() else: # Can't extract the exact URL. Let the user search. url = relevant raise SpamfilterError(url) if '<label for=\'wpRecreate\'' in data: # Make sure your system clock is correct if this error occurs # without any reason! # raise EditConflict(u'Someone deleted the page.') # No raise, simply define these variables and retry: if self._editTime: predata['wpEdittime'] = self._editTime else: predata['wpEdittime'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) if self._startTime: predata['wpStarttime'] = self._startTime else: predata['wpStarttime'] = time.strftime('%Y%m%d%H%M%S', time.gmtime()) continue if self.site().has_mediawiki_message("viewsource")\ and self.site().mediawiki_message('viewsource') in data: # The page is locked. This should have already been # detected when getting the page, but there are some # reasons why this didn't work, e.g. the page might be # locked via a cascade lock. try: # Page is locked - try using the sysop account, unless we're using one already if sysop: # Unknown permissions error raise LockedPage() else: self.site().forceLogin(sysop = True) output(u'Page is locked, retrying using sysop account.') return self._putPageOld(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = True), sysop = True) except NoUsername: raise LockedPage() if not newToken and "<textarea" in data: ##if "<textarea" in data: # for debug use only, if badtoken still happen # We might have been using an outdated token output(u"Changing page has failed. Retrying.") return self._putPageOld(text, comment, watchArticle, minorEdit, newPage, token=self.site().getToken(sysop = sysop, getagain = True), newToken = True, sysop = sysop) # I think the error message title was changed from "Wikimedia Error" # to "Wikipedia has a problem", but I'm not sure. Maybe we could # just check for HTTP Status 500 (Internal Server Error)? if ("<title>Wikimedia Error</title>" in data or "has a problem</title>" in data) \ or response.status == 500: output(u"Server error encountered; will retry in %i minute%s." % (retry_delay, retry_delay != 1 and "s" or "")) time.sleep(60 * retry_delay) retry_delay *= 2 if retry_delay > 30: retry_delay = 30 continue if self.site().mediawiki_message('readonly') in data or self.site().mediawiki_message('readonly_lag') in data: output(u"The database is currently locked for write access; will retry in %i minute%s." % (retry_delay, retry_delay != 1 and "s" or "")) time.sleep(60 * retry_delay) retry_delay *= 2 if retry_delay > 30: retry_delay = 30 continue if self.site().has_mediawiki_message('longpageerror'): # FIXME: Long page error detection isn't working in Vietnamese Wikipedia. long_page_errorR = re.compile( # Some wikis (e.g. Lithuanian and Slovak Wikipedia) use {{plural}} in # [[MediaWiki:longpageerror]] re.sub(r'\\{\\{plural\\:.*?\\}\\}', '.*?', re.escape( html2unicode( self.site().mediawiki_message('longpageerror') ) ) ).replace("\$1", "(?P<length>[\d,.\s]+)", 1).replace("\$2", "(?P<limit>[\d,.\s]+)", 1), re.UNICODE) match = long_page_errorR.search(data) if match: # Some wikis (e.g. Lithuanian Wikipedia) don't use $2 parameter in # [[MediaWiki:longpageerror]] longpage_length = 0 ; longpage_limit = 0 if 'length' in match.groups(): longpage_length = match.group('length') if 'limit' in match.groups(): longpage_limit = match.group('limit') raise LongPageError(longpage_length, longpage_limit) # We might have been prompted for a captcha if the # account is not autoconfirmed, checking.... ## output('%s' % data) # WHY? solve = self.site().solveCaptcha(data) if solve: return self._putPageOld(text, comment, watchArticle, minorEdit, newPage, token, newToken, sysop, captcha=solve) # We are expecting a 302 to the action=view page. I'm not sure why this was removed in r5019 if data.strip() != u"": # Something went wrong, and we don't know what. Show the # HTML code that hopefully includes some error message. output(u"ERROR: Unexpected response from wiki server.") output(u" %s (%s) " % (response.status, response.reason)) output(data) # Unexpected responses should raise an error and not pass, # be it silently or loudly. This should raise an error if 'name="wpTextbox1"' in data and 'var wgAction = "submit"' in data: # We are on the preview page, so the page was not saved raise PageNotSaved return response.status, response.reason, data def canBeEdited(self): """Return bool indicating whether this page can be edited. This returns True if and only if: * page is unprotected, and bot has an account for this site, or * page is protected, and bot has a sysop account for this site. """ try: self.get() except: pass if self.editRestriction == 'sysop': userdict = config.sysopnames else: userdict = config.usernames try: userdict[self.site().family.name][self.site().lang] return True except: # We don't have a user account for that wiki, or the # page is locked and we don't have a sysop account. return False def toggleTalkPage(self): """Return the other member of the article-talk page pair for this Page. If self is a talk page, returns the associated content page; otherwise, returns the associated talk page. Returns None if self is a special page. """ ns = self.namespace() if ns < 0: # Special page return None if self.isTalkPage(): if self.namespace() == 1: return Page(self.site(), self.titleWithoutNamespace()) else: return Page(self.site(), self.site().namespace(ns - 1) + ':' + self.titleWithoutNamespace()) else: return Page(self.site(), self.site().namespace(ns + 1) + ':' + self.titleWithoutNamespace()) def interwiki(self): """Return a list of interwiki links in the page text. This will retrieve the page to do its work, so it can raise the same exceptions that are raised by the get() method. The return value is a list of Page objects for each of the interwiki links in the page text. """ if hasattr(self, "_interwikis"): return self._interwikis text = self.get() # Replace {{PAGENAME}} by its value for pagenametext in self.site().family.pagenamecodes( self.site().language()): text = text.replace(u"{{%s}}" % pagenametext, self.title()) ll = getLanguageLinks(text, insite=self.site(), pageLink=self.aslink()) result = ll.values() self._interwikis = result return result def categories(self, get_redirect=False): """Return a list of categories that the article is in. This will retrieve the page text to do its work, so it can raise the same exceptions that are raised by the get() method. The return value is a list of Category objects, one for each of the category links in the page text. """ try: category_links_to_return = getCategoryLinks(self.get(get_redirect=get_redirect), self.site()) except NoPage: category_links_to_return = [] return category_links_to_return def __cmp__(self, other): """Test for equality and inequality of Page objects""" if not isinstance(other, Page): # especially, return -1 if other is None return -1 if self._site == other._site: return cmp(self._title, other._title) else: return cmp(self._site, other._site) def __hash__(self): # Pseudo method that makes it possible to store Page objects as keys # in hash-tables. This relies on the fact that the string # representation of an instance can not change after the construction. return hash(str(self)) def linkedPages(self, withImageLinks = False): """Return a list of Pages that this Page links to. Excludes interwiki and category links, and also image links by default. """ result = [] try: thistxt = removeLanguageLinks(self.get(get_redirect=True), self.site()) except NoPage: raise except IsRedirectPage: raise except SectionError: return [] thistxt = removeCategoryLinks(thistxt, self.site()) # remove HTML comments, pre, nowiki, and includeonly sections # from text before processing thistxt = removeDisabledParts(thistxt) # resolve {{ns:-1}} or {{ns:Help}} thistxt = self.site().resolvemagicwords(thistxt) for match in Rlink.finditer(thistxt): title = match.group('title') title = title.replace("_", " ").strip(" ") if title.startswith("#"): # this is an internal section link continue if not self.site().isInterwikiLink(title): try: page = Page(self.site(), title) try: hash(str(page)) except Exception: raise Error(u"Page %s contains invalid link to [[%s]]." % (self.title(), title)) except Error: if verbose: output(u"Page %s contains invalid link to [[%s]]." % (self.title(), title)) continue if not withImageLinks and page.isImage(): continue if page.sectionFreeTitle(): result.append(page) return result def imagelinks(self, followRedirects=False, loose=False): """Return a list of ImagePage objects for images displayed on this Page. Includes images in galleries. If loose is True, this will find anything that looks like it could be an image. This is useful for finding, say, images that are passed as parameters to templates. """ results = [] # Find normal images for page in self.linkedPages(withImageLinks = True): if page.isImage(): # convert Page object to ImagePage object imagePage = ImagePage(page.site(), page.title()) results.append(imagePage) # Find images in galleries pageText = self.get(get_redirect=followRedirects) galleryR = re.compile('<gallery>.*?</gallery>', re.DOTALL) galleryEntryR = re.compile('(?P<title>(%s|%s):.+?)(\|.+)?\n' % (self.site().image_namespace(), self.site().family.image_namespace(code = '_default'))) for gallery in galleryR.findall(pageText): for match in galleryEntryR.finditer(gallery): page = ImagePage(self.site(), match.group('title')) results.append(page) if loose: ns = getSite().image_namespace() imageR = re.compile('\w\w\w+\.(?:gif|png|jpg|jpeg|svg|JPG|xcf|pdf|mid|ogg|djvu)', re.IGNORECASE) for imageName in imageR.findall(pageText): results.append(ImagePage(self.site(), ns + ':' + imageName)) return list(set(results)) def templates(self, get_redirect=False): """Return a list of titles (unicode) of templates used on this Page. Template parameters are ignored. """ if not hasattr(self, "_templates"): self._templates = list(set([template for (template, param) in self.templatesWithParams( get_redirect=get_redirect)])) return self._templates def templatesWithParams(self, thistxt=None, get_redirect=False): """Return a list of templates used on this Page. Return value is a list of tuples. There is one tuple for each use of a template in the page, with the template title as the first entry and a list of parameters as the second entry. If thistxt is set, it is used instead of current page content. """ if not thistxt: try: thistxt = self.get(get_redirect=get_redirect) except (IsRedirectPage, NoPage): return [] # remove commented-out stuff etc. thistxt = removeDisabledParts(thistxt) # marker for inside templates or parameters marker = findmarker(thistxt, u'@@', u'@') # marker for links marker2 = findmarker(thistxt, u'##', u'#') # marker for math marker3 = findmarker(thistxt, u'%%', u'%') result = [] inside = {} count = 0 Rtemplate = re.compile( ur'{{(msg:)?(?P<name>[^{\|]+?)(\|(?P<params>[^{]*?))?}}') Rlink = re.compile(ur'\[\[[^\]]+\]\]') Rmath = re.compile(ur'<math>[^<]+</math>') Rmarker = re.compile(ur'%s(\d+)%s' % (marker, marker)) Rmarker2 = re.compile(ur'%s(\d+)%s' % (marker2, marker2)) Rmarker3 = re.compile(ur'%s(\d+)%s' % (marker3, marker3)) # Replace math with markers maths = {} count = 0 for m in Rmath.finditer(thistxt): count += 1 text = m.group() thistxt = thistxt.replace(text, '%s%d%s' % (marker3, count, marker3)) maths[count] = text while Rtemplate.search(thistxt) is not None: for m in Rtemplate.finditer(thistxt): # Make sure it is not detected again count += 1 text = m.group() thistxt = thistxt.replace(text, '%s%d%s' % (marker, count, marker)) # Make sure stored templates don't contain markers for m2 in Rmarker.finditer(text): text = text.replace(m2.group(), inside[int(m2.group(1))]) for m2 in Rmarker3.finditer(text): text = text.replace(m2.group(), maths[int(m2.group(1))]) inside[count] = text # Name name = m.group('name').strip() m2 = Rmarker.search(name) or Rmath.search(name) if m2 is not None: # Doesn't detect templates whose name changes, # or templates whose name contains math tags continue if self.site().isInterwikiLink(name): continue # {{DEFAULTSORT:...}} or {{#if: }} if name.startswith('DEFAULTSORT:') or name.startswith('#'): continue try: name = Page(self.site(), name).title() except InvalidTitle: if name: output( u"Page %s contains invalid template name {{%s}}." % (self.title(), name.strip())) continue # Parameters paramString = m.group('params') params = [] if paramString: # Replace links to markers links = {} count2 = 0 for m2 in Rlink.finditer(paramString): count2 += 1 text = m2.group() paramString = paramString.replace(text, '%s%d%s' % (marker2, count2, marker2)) links[count2] = text # Parse string markedParams = paramString.split('|') # Replace markers for param in markedParams: for m2 in Rmarker.finditer(param): param = param.replace(m2.group(), inside[int(m2.group(1))]) for m2 in Rmarker2.finditer(param): param = param.replace(m2.group(), links[int(m2.group(1))]) for m2 in Rmarker3.finditer(param): param = param.replace(m2.group(), maths[int(m2.group(1))]) params.append(param) # Add it to the result result.append((name, params)) return result def getRedirectTarget(self): """Return a Page object for the target this Page redirects to. If this page is not a redirect page, will raise an IsNotRedirectPage exception. This method also can raise a NoPage exception. """ try: self.get() except NoPage: raise except IsRedirectPage, err: target = err[0].replace('"', '"') # otherwise it will return error with # pages with " inside. if '|' in target: warnings.warn("'%s' has a | character, this makes no sense" % target, Warning) return Page(self.site(), target) else: raise IsNotRedirectPage(self) def getVersionHistory(self, forceReload=False, reverseOrder=False, getAll=False, revCount=500): """Load the version history page and return history information. Return value is a list of tuples, where each tuple represents one edit and is built of revision id, edit date/time, user name, and edit summary. Starts with the most current revision, unless reverseOrder is True. Defaults to getting the first revCount edits, unless getAll is True. """ site = self.site() # regular expression matching one edit in the version history. # results will have 4 groups: oldid, edit date/time, user name, and edit # summary. if self.site().versionnumber() < 4: editR = re.compile('<li>\(.*?\)\s+\(.*\).*?<a href=".*?oldid=([0-9]*)" title=".*?">([^<]*)</a> <span class=\'user\'><a href=".*?" title=".*?">([^<]*?)</a></span>.*?(?:<span class=\'comment\'>(.*?)</span>)?</li>') elif self.site().versionnumber() < 15: editR = re.compile('<li>\(.*?\)\s+\(.*\).*?<a href=".*?oldid=([0-9]*)" title=".*?">([^<]*)</a> (?:<span class=\'history-user\'>|)<a href=".*?" title=".*?">([^<]*?)</a>.*?(?:</span>|).*?(?:<span class=[\'"]comment[\'"]>(.*?)</span>)?</li>') else: editR = re.compile(r'<li class=".*?">\((?:\w*|<a[^<]*</a>)\)\s\((?:\w*|<a[^<]*</a>)\).*?<a href=".*?([0-9]*)" title=".*?">([^<]*)</a> <span class=\'history-user\'><a [^>]*?>([^<]*?)</a>.*?</span></span>(?: <span class="minor">m</span>|)(?: <span class="history-size">.*?</span>|)(?: <span class=[\'"]comment[\'"]>\((?:<span class="autocomment">|)(.*?)(?:</span>|)\)</span>)?(?: \(<span class="mw-history-undo">.*?</span>\)|)\s*</li>', re.UNICODE) startFromPage = None thisHistoryDone = False skip = False # Used in determining whether we need to skip the first page RLinkToNextPage = re.compile('&offset=(.*?)&') # Are we getting by Earliest first? if reverseOrder: # Check if _versionhistoryearliest exists if not hasattr(self, '_versionhistoryearliest') or forceReload: self._versionhistoryearliest = [] elif getAll and len(self._versionhistoryearliest) == revCount: # Cause a reload, or at least make the loop run thisHistoryDone = False skip = True else: thisHistoryDone = True elif not hasattr(self, '_versionhistory') or forceReload: self._versionhistory = [] elif getAll and len(self._versionhistory) == revCount: # Cause a reload, or at least make the loop run thisHistoryDone = False skip = True else: thisHistoryDone = True while not thisHistoryDone: path = site.family.version_history_address(self.site().language(), self.urlname(), revCount) if reverseOrder: path += '&dir=prev' if startFromPage: path += '&offset=' + startFromPage # this loop will run until the page could be retrieved # Try to retrieve the page until it was successfully loaded (just in case # the server is down or overloaded) # wait for retry_idle_time minutes (growing!) between retries. retry_idle_time = 1 if verbose: if startFromPage: output(u'Continuing to get version history of %s' % self.aslink(forceInterwiki = True)) else: output(u'Getting version history of %s' % self.aslink(forceInterwiki = True)) txt = site.getUrl(path) # save a copy of the text self_txt = txt if reverseOrder: # If we are getting all of the page history... if getAll: if len(self._versionhistoryearliest) == 0: matchObj = RLinkToNextPage.search(self_txt) if matchObj: startFromPage = matchObj.group(1) else: thisHistoryDone = True edits = editR.findall(self_txt) edits.reverse() for edit in edits: self._versionhistoryearliest.append(edit) if len(edits) < revCount: thisHistoryDone = True else: if not skip: edits = editR.findall(self_txt) edits.reverse() for edit in edits: self._versionhistoryearliest.append(edit) if len(edits) < revCount: thisHistoryDone = True matchObj = RLinkToNextPage.search(self_txt) if matchObj: startFromPage = matchObj.group(1) else: thisHistoryDone = True else: # Skip the first page only, skip = False matchObj = RLinkToNextPage.search(self_txt) if matchObj: startFromPage = matchObj.group(1) else: thisHistoryDone = True else: # If we are not getting all, we stop on the first page. for edit in editR.findall(self_txt): self._versionhistoryearliest.append(edit) self._versionhistoryearliest.reverse() thisHistoryDone = True else: # If we are getting all of the page history... if getAll: if len(self._versionhistory) == 0: matchObj = RLinkToNextPage.search(self_txt) if matchObj: startFromPage = matchObj.group(1) else: thisHistoryDone = True edits = editR.findall(self_txt) for edit in edits: self._versionhistory.append(edit) if len(edits) < revCount: thisHistoryDone = True else: if not skip: edits = editR.findall(self_txt) for edit in edits: self._versionhistory.append(edit) if len(edits) < revCount: thisHistoryDone = True matchObj = RLinkToNextPage.findall(self_txt) if len(matchObj) >= 2: startFromPage = matchObj[1] else: thisHistoryDone = True else: # Skip the first page only, skip = False matchObj = RLinkToNextPage.search(self_txt) if matchObj: startFromPage = matchObj.group(1) else: thisHistoryDone = True else: # If we are not getting all, we stop on the first page. for edit in editR.findall(self_txt): self._versionhistory.append(edit) thisHistoryDone = True if reverseOrder: # Return only revCount edits, even if the version history is extensive if len(self._versionhistoryearliest) > revCount and not getAll: return self._versionhistoryearliest[0:revCount] return self._versionhistoryearliest # Return only revCount edits, even if the version history is extensive if len(self._versionhistory) > revCount and not getAll: return self._versionhistory[0:revCount] return self._versionhistory def getVersionHistoryTable(self, forceReload=False, reverseOrder=False, getAll=False, revCount=500): """Return the version history as a wiki table.""" result = '{| border="1"\n' result += '! oldid || date/time || username || edit summary\n' for oldid, time, username, summary in self.getVersionHistory(forceReload = forceReload, reverseOrder = reverseOrder, getAll = getAll, revCount = revCount): result += '|----\n' result += '| %s || %s || %s || <nowiki>%s\n' % (oldid, time, username, summary) result += '|}\n' return result def fullVersionHistory(self): """ Return all previous versions including wikitext. Gives a list of tuples consisting of revision ID, edit date/time, user name and content """ address = self.site().export_address() predata = { 'action': 'submit', 'pages': self.title() } get_throttle(requestsize = 10) now = time.time() if self.site().hostname() in config.authenticate.keys(): predata["Content-type"] = "application/x-www-form-urlencoded" predata["User-agent"] = useragent data = self.site.urlEncode(predata) response = urllib2.urlopen(urllib2.Request('http://' + self.site.hostname() + address, data)) data = response.read() else: response, data = self.site().postForm(address, predata) data = data.encode(self.site().encoding()) # get_throttle.setDelay(time.time() - now) output = [] # TODO: parse XML using an actual XML parser instead of regex! r = re.compile("\<revision\>.*?\<id\>(?P<id>.*?)\<\/id\>.*?\<timestamp\>(?P<timestamp>.*?)\<\/timestamp\>.*?\<(?:ip|username)\>(?P<user>.*?)\</(?:ip|username)\>.*?\<text.*?\>(?P<content>.*?)\<\/text\>",re.DOTALL) #r = re.compile("\<revision\>.*?\<timestamp\>(.*?)\<\/timestamp\>.*?\<(?:ip|username)\>(.*?)\<",re.DOTALL) return [ (match.group('id'), match.group('timestamp'), unescape(match.group('user')), unescape(match.group('content'))) for match in r.finditer(data) ] def contributingUsers(self): """Return a set of usernames (or IPs) of users who edited this page.""" edits = self.getVersionHistory() users = set([edit[2] for edit in edits]) return users def move(self, newtitle, reason=None, movetalkpage=True, sysop=False, throttle=True, deleteAndMove=False, safe=True, fixredirects=True, leaveRedirect=True): """Move this page to new title given by newtitle. If safe, don't try to move and delete if not directly requested. * fixredirects has no effect in MW < 1.13""" # Login try: self.get() except: pass sysop = self._getActionUser(action = 'move', restriction = self.moveRestriction, sysop = False) if deleteAndMove: sysop = self._getActionUser(action = 'delete', restriction = '', sysop = True) # Check blocks self.site().checkBlocks(sysop = sysop) if throttle: put_throttle() if reason is None: reason = input(u'Please enter a reason for the move:') if self.isTalkPage(): movetalkpage = False host = self.site().hostname() address = self.site().move_address() token = self.site().getToken(self, sysop = sysop) predata = { 'wpOldTitle': self.title().encode(self.site().encoding()), 'wpNewTitle': newtitle.encode(self.site().encoding()), 'wpReason': reason.encode(self.site().encoding()), } if deleteAndMove: predata['wpDeleteAndMove'] = self.site().mediawiki_message('delete_and_move_confirm') predata['wpConfirm'] = '1' if movetalkpage: predata['wpMovetalk'] = '1' else: predata['wpMovetalk'] = '0' if self.site().versionnumber() >= 13: if fixredirects: predata['wpFixRedirects'] = '1' else: predata['wpFixRedirects'] = '0' if leaveRedirect: predata['wpLeaveRedirect'] = '1' else: predata['wpLeaveRedirect'] = '0' if token: predata['wpEditToken'] = token if self.site().hostname() in config.authenticate.keys(): predata['Content-type'] = 'application/x-www-form-urlencoded' predata['User-agent'] = useragent data = self.site().urlEncode(predata) response = urllib2.urlopen(urllib2.Request(self.site().protocol() + '://' + self.site().hostname() + address, data)) data = u'' else: response, data = self.site().postForm(address, predata, sysop = sysop) if data == u'' or self.site().mediawiki_message('pagemovedsub') in data: if deleteAndMove: output(u'Page %s moved to %s, deleting the existing page' % (self.title(), newtitle)) else: output(u'Page %s moved to %s' % (self.title(), newtitle)) return True else: self.site().checkBlocks(sysop = sysop) if self.site().mediawiki_message('articleexists') in data or self.site().mediawiki_message('delete_and_move') in data: if safe: output(u'Page move failed: Target page [[%s]] already exists.' % newtitle) return False else: try: # Try to delete and move return self.move(newtitle = newtitle, reason = reason, movetalkpage = movetalkpage, throttle = throttle, deleteAndMove = True) except NoUsername: # We dont have the user rights to delete output(u'Page moved failed: Target page [[%s]] already exists.' % newtitle) return False elif not self.exists(): raise NoPage(u'Page move failed: Source page [[%s]] does not exist.' % newtitle) elif Page(self.site(),newtitle).exists(): # XXX : This might be buggy : if the move was successful, the target pase *has* been created raise PageNotSaved(u'Page move failed: Target page [[%s]] already exists.' % newtitle) else: output(u'Page move failed for unknown reason.') try: ibegin = data.index('<!-- start content -->') + 22 iend = data.index('<!-- end content -->') except ValueError: # if begin/end markers weren't found, show entire HTML file output(data) else: # otherwise, remove the irrelevant sections data = data[ibegin:iend] output(data) return False def delete(self, reason=None, prompt=True, throttle=True, mark=False): """Deletes the page from the wiki. Requires administrator status. If reason is None, asks for a reason. If prompt is True, asks the user if he wants to delete the page. If the user does not have admin rights and mark is True, the page is marked for deletion instead. """ # Login try: self._getActionUser(action = 'delete', sysop = True) except NoUsername: if mark and self.exists(): text = self.get(get_redirect = True) output(u'Cannot delete page %s - marking the page for deletion instead:' % self.aslink()) # Note: Parameters to {{delete}}, and their meanings, vary from one Wikipedia to another. # If you want or need to use them, you must be careful not to break others. Else don't. self.put(u'{{delete}}\n%s --{{Utente:Pfenner/firma}} 15:18, nov 25, 2009 (CET)\n----\n\n%s' % (reason, text), comment = reason) return else: raise # Check blocks self.site().checkBlocks(sysop = True) if throttle: put_throttle() if reason is None: reason = input(u'Please enter a reason for the deletion:') answer = 'y' if prompt and not hasattr(self.site(), '_noDeletePrompt'): answer = inputChoice(u'Do you want to delete %s?' % self.aslink(forceInterwiki = True), ['yes', 'no', 'all'], ['y', 'N', 'a'], 'N') if answer == 'a': answer = 'y' self.site()._noDeletePrompt = True if answer == 'y': token = self.site().getToken(self, sysop = True) try: d = self.site().api_address() del d except NotImplementedError: config.use_api = False if config.use_api and self.site().versionnumber() >= 12: params = { 'action': 'delete', 'title': self.title(), 'token': token, 'reason': reason, } datas = query.GetData(params, self.site(), sysop = True) if datas.has_key('delete'): output(u'Page %s deleted' % self.aslink(forceInterwiki = True)) return True else: if datas['error']['code'] == 'missingtitle': output(u'Page %s could not be deleted - it doesn\'t exist' % self.aslink(forceInterwiki = True)) else: output(u'Deletion of %s failed for an unknown reason. The response text is:' % self.aslink(forceInterwiki = True)) output('%s' % datas) return False else: host = self.site().hostname() address = self.site().delete_address(self.urlname()) reason = reason.encode(self.site().encoding()) predata = { 'wpDeleteReasonList': 'other', 'wpReason': reason, 'wpComment': reason, 'wpConfirm': '1', 'wpConfirmB': '1' } if token: predata['wpEditToken'] = token if self.site().hostname() in config.authenticate.keys(): predata['Content-type'] = 'application/x-www-form-urlencoded' predata['User-agent'] = useragent data = self.site().urlEncode(predata) response = urllib2.urlopen(urllib2.Request(self.site().protocol() + '://' + self.site().hostname() + address, data)) data = u'' else: response, data = self.site().postForm(address, predata, sysop = True) if data: self.site().checkBlocks(sysop = True) if self.site().mediawiki_message('actioncomplete') in data: output(u'Page %s deleted' % self.aslink(forceInterwiki = True)) return True elif self.site().mediawiki_message('cannotdelete') in data: output(u'Page %s could not be deleted - it doesn\'t exist' % self.aslink(forceInterwiki = True)) return False else: output(u'Deletion of %s failed for an unknown reason. The response text is:' % self.aslink(forceInterwiki = True)) try: ibegin = data.index('<!-- start content -->') + 22 iend = data.index('<!-- end content -->') except ValueError: # if begin/end markers weren't found, show entire HTML file output(data) else: # otherwise, remove the irrelevant sections data = data[ibegin:iend] output(data) return False def loadDeletedRevisions(self): """Retrieve all deleted revisions for this Page from Special/Undelete. Stores all revisions' timestamps, dates, editors and comments. Returns list of timestamps (which can be used to retrieve revisions later on). """ # Login self._getActionUser(action = 'deletedhistory', sysop = True) #TODO: Handle image file revisions too. output(u'Loading list of deleted revisions for [[%s]]...' % self.title()) address = self.site().undelete_view_address(self.urlname()) text = self.site().getUrl(address, sysop = True) #TODO: Handle non-existent pages etc rxRevs = re.compile(r'<input name="(?P<ts>(?:ts|fileid)\d+)".*?title=".*?">(?P<date>.*?)</a>.*?title=".*?">(?P<editor>.*?)</a>.*?<span class="comment">\((?P<comment>.*?)\)</span>',re.DOTALL) self._deletedRevs = {} for rev in rxRevs.finditer(text): self._deletedRevs[rev.group('ts')] = [ rev.group('date'), rev.group('editor'), rev.group('comment'), None, #Revision text False, #Restoration marker ] self._deletedRevsModified = False return self._deletedRevs.keys() def getDeletedRevision(self, timestamp, retrieveText=False): """Return a particular deleted revision by timestamp. Return value is a list of [date, editor, comment, text, restoration marker]. text will be None, unless retrieveText is True (or has been retrieved earlier). """ if self._deletedRevs is None: self.loadDeletedRevisions() if timestamp not in self._deletedRevs: #TODO: Throw an exception instead? return None if retrieveText and not self._deletedRevs[timestamp][3] and timestamp[:2]=='ts': # Login self._getActionUser(action = 'delete', sysop = True) output(u'Retrieving text of deleted revision...') address = self.site().undelete_view_address(self.urlname(),timestamp) text = self.site().getUrl(address, sysop = True) und = re.search('<textarea readonly="1" cols="80" rows="25">(.*?)</textarea><div><form method="post"',text,re.DOTALL) if und: self._deletedRevs[timestamp][3] = und.group(1) return self._deletedRevs[timestamp] def markDeletedRevision(self, timestamp, undelete=True): """Mark the revision identified by timestamp for undeletion. If undelete is False, mark the revision to remain deleted. """ if self._deletedRevs is None: self.loadDeletedRevisions() if timestamp not in self._deletedRevs: #TODO: Throw an exception? return None self._deletedRevs[timestamp][4] = undelete self._deletedRevsModified = True def undelete(self, comment='', throttle=True): """Undeletes page based on the undeletion markers set by previous calls. If no calls have been made since loadDeletedRevisions(), everything will be restored. Simplest case: wikipedia.Page(...).undelete('This will restore all revisions') More complex: pg = wikipedia.Page(...) revs = pg.loadDeletedRevsions() for rev in revs: if ... #decide whether to undelete a revision pg.markDeletedRevision(rev) #mark for undeletion pg.undelete('This will restore only selected revisions.') """ # Login self._getActionUser(action = 'undelete', sysop = True) # Check blocks self.site().checkBlocks(sysop = True) if throttle: put_throttle() address = self.site().undelete_address() token = self.site().getToken(self, sysop=True) formdata = { 'target': self.title(), 'wpComment': comment, 'wpEditToken': token, 'restore': self.site().mediawiki_message('undeletebtn') } if self._deletedRevs is not None and self._deletedRevsModified: for ts in self._deletedRevs: if self._deletedRevs[ts][4]: formdata['ts'+ts] = '1' self._deletedRevs = None #TODO: Check for errors below (have we succeeded? etc): result = self.site().postForm(address,formdata,sysop=True) output(u'Page %s undeleted' % self.aslink()) return result def protect(self, edit='sysop', move='sysop', unprotect=False, reason=None, duration = None, cascading = False, prompt=True, throttle=True): """(Un)protect a wiki page. Requires administrator status. If reason is None, asks for a reason. If prompt is True, asks the user if he wants to protect the page. Valid values for edit and move are: * '' (equivalent to 'none') * 'autoconfirmed' * 'sysop' """ # Login self._getActionUser(action = 'protect', sysop = True) # Check blocks self.site().checkBlocks(sysop = True) address = self.site().protect_address(self.urlname()) if unprotect: address = self.site().unprotect_address(self.urlname()) # unprotect_address is actually an alias for protect_address... edit = move = '' else: edit, move = edit.lower(), move.lower() if throttle: put_throttle() if reason is None: reason = input( u'Please enter a reason for the change of the protection level:') reason = reason.encode(self.site().encoding()) answer = 'y' if prompt and not hasattr(self.site(), '_noProtectPrompt'): answer = inputChoice( u'Do you want to change the protection level of %s?' % self.aslink(forceInterwiki = True), ['Yes', 'No', 'All'], ['Y', 'N', 'A'], 'N') if answer == 'a': answer = 'y' self.site()._noProtectPrompt = True if answer == 'y': host = self.site().hostname() token = self.site().getToken(self, sysop = True) # Translate 'none' to '' if edit == 'none': edit = '' if move == 'none': move = '' # Translate no duration to infinite if duration == 'none' or duration is None: duration = 'infinite' # Get cascading if cascading == False: cascading = '0' else: if edit != 'sysop' or move != 'sysop': # You can't protect a page as autoconfirmed and cascading, prevent the error cascading = '0' output(u"NOTE: The page can't be protected with cascading and not also with only-sysop. Set cascading \"off\"") else: cascading = '1' predata = { 'mwProtect-cascade': cascading, 'mwProtect-level-edit': edit, 'mwProtect-level-move': move, 'mwProtect-reason': reason, 'mwProtect-expiry': duration, } if token: predata['wpEditToken'] = token if self.site().hostname() in config.authenticate.keys(): predata["Content-type"] = "application/x-www-form-urlencoded" predata["User-agent"] = useragent data = self.site().urlEncode(predata) response = urllib2.urlopen( urllib2.Request( self.site().protocol() + '://' + self.site().hostname() + address, data)) data = u'' else: response, data = self.site().postForm(address, predata, sysop=True) if response.status == 302 and not data: output(u'Changed protection level of page %s.' % self.aslink()) return True else: #Normally, we expect a 302 with no data, so this means an error self.site().checkBlocks(sysop = True) output(u'Failed to change protection level of page %s:' % self.aslink()) output(u"HTTP response code %s" % response.status) output(data) return False def removeImage(self, image, put=False, summary=None, safe=True): """Remove all occurrences of an image from this Page.""" # TODO: this should be grouped with other functions that operate on # wiki-text rather than the Page object return self.replaceImage(image, None, put, summary, safe) def replaceImage(self, image, replacement=None, put=False, summary=None, safe=True): """Replace all occurences of an image by another image. Giving None as argument for replacement will delink instead of replace. The argument image must be without namespace and all spaces replaced by underscores. If put is False, the new text will be returned. If put is True, the edits will be saved to the wiki and True will be returned on succes, and otherwise False. Edit errors propagate. """ # TODO: this should be grouped with other functions that operate on # wiki-text rather than the Page object # Copyright (c) Orgullomoore, Bryan # TODO: document and simplify the code site = self.site() text = self.get() new_text = text def capitalizationPattern(s): """ Given a string, creates a pattern that matches the string, with the first letter case-insensitive if capitalization is switched on on the site you're working on. """ if self.site().nocapitalize: return re.escape(s) else: return ur'(?:[%s%s]%s)' % (re.escape(s[0].upper()), re.escape(s[0].lower()), re.escape(s[1:])) namespaces = set(site.namespace(6, all = True) + site.namespace(-2, all = True)) # note that the colon is already included here namespacePattern = ur'\s*(?:%s)\s*\:\s*' % u'|'.join(namespaces) imagePattern = u'(%s)' % capitalizationPattern(image).replace(r'\_', '[ _]') def filename_replacer(match): if replacement is None: return u'' else: old = match.group() return old[:match.start('filename')] + replacement + old[match.end('filename'):] # The group params contains parameters such as thumb and 200px, as well # as the image caption. The caption can contain wiki links, but each # link has to be closed properly. paramPattern = r'(?:\|(?:(?!\[\[).|\[\[.*?\]\])*?)' rImage = re.compile(ur'\[\[(?P<namespace>%s)(?P<filename>%s)(?P<params>%s*?)\]\]' % (namespacePattern, imagePattern, paramPattern)) if replacement is None: new_text = rImage.sub('', new_text) else: new_text = rImage.sub('[[\g<namespace>%s\g<params>]]' % replacement, new_text) # Remove the image from galleries galleryR = re.compile(r'(?is)<gallery>(?P<items>.*?)</gallery>') galleryItemR = re.compile(r'(?m)^%s?(?P<filename>%s)\s*(?P<label>\|.*?)?\s*$' % (namespacePattern, imagePattern)) def gallery_replacer(match): return ur'<gallery>%s<gallery>' % galleryItemR.sub(filename_replacer, match.group('items')) new_text = galleryR.sub(gallery_replacer, new_text) if (text == new_text) or (not safe): # All previous steps did not work, so the image is # likely embedded in a complicated template. # Note: this regular expression can't handle nested templates. templateR = re.compile(ur'(?s)\{\{(?P<contents>.*?)\}\}') fileReferenceR = re.compile(u'%s(?P<filename>(?:%s)?)' % (namespacePattern, imagePattern)) def template_replacer(match): return fileReferenceR.sub(filename_replacer, match.group(0)) new_text = templateR.sub(template_replacer, new_text) if put: if text != new_text: # Save to the wiki self.put(new_text, summary) return True return False else: return new_text def getLatestEditors(self, limit = 1): """ Function to get the last editors of a page """ #action=query&prop=revisions&titles=API&rvprop=timestamp|user|comment params = { 'action' :'query', 'prop' :'revisions', 'rvprop' :'user|timestamp', 'rvlimit' :limit, 'titles' :self.title(), } try: data = query.GetData(params, self.site(), encodeTitle = False)['query']['pages'] except KeyError: raise NoPage(u'API Error, nothing found in the APIs') # We don't know the page's id, if any other better idea please change it return data[data.keys()[0]][u'revisions'] class ImagePage(Page): """A subclass of Page representing an image descriptor wiki page. Supports the same interface as Page, with the following added methods: getImagePageHtml : Download image page and return raw HTML text. fileURL : Return the URL for the image described on this page. fileIsOnCommons : Return True if image stored on Wikimedia Commons. fileIsShared : Return True if image stored on Wikitravel shared repository. getFileMd5Sum : Return image file's MD5 checksum. getFileVersionHistory : Return the image file's version history. getFileVersionHistoryTable: Return the version history in the form of a wiki table. usingPages : Yield Pages on which the image is displayed. """ def __init__(self, site, title, insite = None): Page.__init__(self, site, title, insite, defaultNamespace=6) if self.namespace() != 6: raise ValueError(u'BUG: %s is not in the image namespace!' % title) self._imagePageHtml = None def getImagePageHtml(self): """ Download the image page, and return the HTML, as a unicode string. Caches the HTML code, so that if you run this method twice on the same ImagePage object, the page will only be downloaded once. """ if not self._imagePageHtml: path = self.site().get_address(self.urlname()) self._imagePageHtml = self.site().getUrl(path) return self._imagePageHtml def fileUrl(self): """Return the URL for the image described on this page.""" # There are three types of image pages: # * normal, small images with links like: filename.png (10KB, MIME type: image/png) # * normal, large images with links like: Download high resolution version (1024x768, 200 KB) # * SVG images with links like: filename.svg (1KB, MIME type: image/svg) # This regular expression seems to work with all of them. # The part after the | is required for copying .ogg files from en:, as they do not # have a "full image link" div. This might change in the future; on commons, there # is a full image link for .ogg and .mid files. #*********************** #change to API query: action=query&titles=File:wiki.jpg&prop=imageinfo&iiprop=url params = { 'action' :'query', 'prop' :'imageinfo', 'titles' :self.title(), 'iiprop' :'url', } imagedata = query.GetData(params, self.site(), encodeTitle = False) try: url = imagedata['query']['pages'].values()[0]['imageinfo'][0]['url'] return url # urlR = re.compile(r'<div class="fullImageLink" id="file">.*?<a href="(?P<url>[^ ]+?)"(?! class="image")|<span class="dangerousLink"><a href="(?P<url2>.+?)"', re.DOTALL) # m = urlR.search(self.getImagePageHtml()) # url = m.group('url') or m.group('url2') except KeyError: raise NoPage(u'Image file URL for %s not found.' % self.aslink(forceInterwiki = True)) return url def fileIsOnCommons(self): """Return True if the image is stored on Wikimedia Commons""" return self.fileUrl().startswith( u'http://upload.wikimedia.org/wikipedia/commons/') def fileIsShared(self): """Return True if image is stored on Wikitravel shared repository.""" if 'wikitravel_shared' in self.site().shared_image_repository(): return self.fileUrl().startswith( u'http://wikitravel.org/upload/shared/') return self.fileIsOnCommons() # FIXME: MD5 might be performed on not complete file due to server disconnection # (see bug #1795683). def getFileMd5Sum(self): """Return image file's MD5 checksum.""" uo = MyURLopener() f = uo.open(self.fileUrl()) md5Checksum = md5(f.read()).hexdigest() return md5Checksum def getFileVersionHistory(self): """Return the image file's version history. Return value is a list of tuples containing (timestamp, username, resolution, filesize, comment). """ result = [] history = re.search('(?s)<table class="wikitable filehistory">.+?</table>', self.getImagePageHtml()) if history: lineR = re.compile(r'<tr>(?:<td>.*?</td>){1,2}<td.*?><a href=".+?">(?P<datetime>.+?)</a></td><td>.*?(?P<resolution>\d+\xd7\d+) <span.*?>\((?P<filesize>.+?)\)</span></td><td><a href=".+?"(?: class="new"|) title=".+?">(?P<username>.+?)</a>.*?</td><td>(?:.*?<span class="comment">\((?P<comment>.*?)\)</span>)?</td></tr>') if not lineR.search(history.group()): # b/c code lineR = re.compile(r'<tr>(?:<td>.*?</td>){1,2}<td><a href=".+?">(?P<datetime>.+?)</a></td><td><a href=".+?"(?: class="new"|) title=".+?">(?P<username>.+?)</a>.*?</td><td>(?P<resolution>.*?)</td><td class=".+?">(?P<filesize>.+?)</td><td>(?P<comment>.*?)</td></tr>') else: # backward compatible code history = re.search('(?s)<ul class="special">.+?</ul>', self.getImagePageHtml()) if history: lineR = re.compile('<li> \(.+?\) \(.+?\) <a href=".+?" title=".+?">(?P<datetime>.+?)</a> . . <a href=".+?" title=".+?">(?P<username>.+?)</a> \(.+?\) . . (?P<resolution>\d+.+?\d+) \((?P<filesize>[\d,\.]+) .+?\)( <span class="comment">(?P<comment>.*?)</span>)?</li>') if history: for match in lineR.finditer(history.group()): datetime = match.group('datetime') username = match.group('username') resolution = match.group('resolution') size = match.group('filesize') comment = match.group('comment') or '' result.append((datetime, username, resolution, size, comment)) return result def getLatestUploader(self): """ Function that uses the APIs to detect the latest uploader of the image """ params = { 'action' :'query', 'prop' :'imageinfo', 'titles' :self.title(), } data = query.GetData(params, self.site(), encodeTitle = False) try: # We don't know the page's id, if any other better idea please change it pageid = data['query']['pages'].keys()[0] nick = data['query']['pages'][pageid][u'imageinfo'][0][u'user'] timestamp = data['query']['pages'][pageid][u'imageinfo'][0][u'timestamp'] return [nick, timestamp] except KeyError: raise NoPage(u'API Error, nothing found in the APIs') def getHash(self): """ Function that return the Hash of an file in oder to understand if two Files are the same or not. """ if self.exists(): params = { 'action' :'query', 'titles' :self.title(), 'prop' :'imageinfo', 'iiprop' :'sha1', } # First of all we need the Hash that identify an image data = query.GetData(params, self.site(), encodeTitle = False) pageid = data['query']['pages'].keys()[0] try: hash_found = data['query']['pages'][pageid][u'imageinfo'][0][u'sha1'] except (KeyError, IndexError): try: self.get() except NoPage: output(u'%s has been deleted before getting the Hash. Skipping...' % self.title()) return None except IsRedirectPage: output("Skipping %s because it's a redirect." % self.title()) return None else: raise NoHash('No Hash found in the APIs! Maybe the regex to catch it is wrong or someone has changed the APIs structure.') else: return hash_found else: output(u'File deleted before getting the Hash. Skipping...') return None def getFileVersionHistoryTable(self): """Return the version history in the form of a wiki table.""" lines = [] for (datetime, username, resolution, size, comment) in self.getFileVersionHistory(): lines.append('| %s || %s || %s || %s || %s' % (datetime, username, resolution, size, comment)) return u'{| border="1"\n! date/time || username || resolution || size || edit summary\n|----\n' + u'\n|----\n'.join(lines) + '\n|}' def usingPages(self): """Yield Pages on which the image is displayed.""" titleList = re.search('(?s)<h2 id="filelinks">.+?<!-- end content -->', self.getImagePageHtml()).group() lineR = re.compile( '<li><a href="[^\"]+" title=".+?">(?P<title>.+?)</a></li>') for match in lineR.finditer(titleList): try: yield Page(self.site(), match.group('title')) except InvalidTitle: output( u"Image description page %s contains invalid reference to [[%s]]." % (self.title(), match.group('title'))) class _GetAll(object): """For internal use only - supports getall() function""" def __init__(self, site, pages, throttle, force): self.site = site self.pages = [] self.throttle = throttle self.force = force self.sleeptime = 15 for page in pages: if (not hasattr(page, '_contents') and not hasattr(page, '_getexception')) or force: self.pages.append(page) elif verbose: output(u"BUGWARNING: %s already done!" % page.aslink()) def sleep(self): time.sleep(self.sleeptime) if self.sleeptime <= 60: self.sleeptime += 15 elif self.sleeptime < 360: self.sleeptime += 60 def run(self): if self.pages: while True: try: data = self.getData() except (socket.error, httplib.BadStatusLine, ServerError): # Print the traceback of the caught exception output(u''.join(traceback.format_exception(*sys.exc_info()))) output(u'DBG> got network error in _GetAll.run. ' \ 'Sleeping for %d seconds...' % self.sleeptime) self.sleep() else: if "<title>Wiki does not exist</title>" in data: raise NoSuchSite(u'Wiki %s does not exist yet' % self.site) elif "</mediawiki>" not in data[-20:]: # HTML error Page got thrown because of an internal # error when fetching a revision. output(u'Received incomplete XML data. ' \ 'Sleeping for %d seconds...' % self.sleeptime) self.sleep() elif "<siteinfo>" not in data: # This probably means we got a 'temporary unaivalable' output(u'Got incorrect export page. ' \ 'Sleeping for %d seconds...' % self.sleeptime) self.sleep() else: break R = re.compile(r"\s*<\?xml([^>]*)\?>(.*)",re.DOTALL) m = R.match(data) if m: data = m.group(2) handler = xmlreader.MediaWikiXmlHandler() handler.setCallback(self.oneDone) handler.setHeaderCallback(self.headerDone) #f = open("backup.txt", "w") #f.write(data) #f.close() try: xml.sax.parseString(data, handler) except (xml.sax._exceptions.SAXParseException, ValueError), err: debugDump( 'SaxParseBug', self.site, err, data ) raise except PageNotFound: return # All of the ones that have not been found apparently do not exist for pl in self.pages: if not hasattr(pl,'_contents') and not hasattr(pl,'_getexception'): pl._getexception = NoPage def oneDone(self, entry): title = entry.title username = entry.username ipedit = entry.ipedit timestamp = entry.timestamp text = entry.text editRestriction = entry.editRestriction moveRestriction = entry.moveRestriction revisionId = entry.revisionid page = Page(self.site, title) successful = False for page2 in self.pages: if page2.sectionFreeTitle() == page.sectionFreeTitle(): if not (hasattr(page2,'_contents') or hasattr(page2,'_getexception')) or self.force: page2.editRestriction = entry.editRestriction page2.moveRestriction = entry.moveRestriction if editRestriction == 'autoconfirmed': page2._editrestriction = True page2._permalink = entry.revisionid page2._userName = username page2._ipedit = ipedit page2._revisionId = revisionId page2._editTime = timestamp section = page2.section() # Store the content page2._contents = text m = self.site.redirectRegex().match(text) if m: ## output(u"%s is a redirect" % page2.aslink()) redirectto = m.group(1) if section and not "#" in redirectto: redirectto = redirectto+"#"+section page2._getexception = IsRedirectPage page2._redirarg = redirectto # This is used for checking deletion conflict. # Use the data loading time. page2._startTime = time.strftime('%Y%m%d%H%M%S', time.gmtime()) if section: m = re.search("\.3D\_*(\.27\.27+)?(\.5B\.5B)?\_*%s\_*(\.5B\.5B)?(\.27\.27+)?\_*\.3D" % re.escape(section), sectionencode(text,page2.site().encoding())) if not m: try: page2._getexception output(u"WARNING: Section not found: %s" % page2.aslink(forceInterwiki = True)) except AttributeError: # There is no exception yet page2._getexception = SectionError successful = True # Note that there is no break here. The reason is that there # might be duplicates in the pages list. if not successful: output(u"BUG>> title %s (%s) not found in list" % (title, page.aslink(forceInterwiki=True))) output(u'Expected one of: %s' % u','.join([page2.aslink(forceInterwiki=True) for page2 in self.pages])) raise PageNotFound def headerDone(self, header): # Verify version version = header.generator p = re.compile('^MediaWiki (.+)$') m = p.match(version) if m: version = m.group(1) if version != self.site.version(): output(u'WARNING: Family file %s contains version number %s, but it should be %s' % (self.site.family.name, self.site.version(), version)) # Verify case if self.site.nocapitalize: case = 'case-sensitive' else: case = 'first-letter' if case != header.case.strip(): output(u'WARNING: Family file %s contains case %s, but it should be %s' % (self.site.family.name, case, header.case.strip())) # Verify namespaces lang = self.site.lang ids = header.namespaces.keys() ids.sort() for id in ids: nshdr = header.namespaces[id] if self.site.family.isDefinedNSLanguage(id, lang): ns = self.site.namespace(id) or u'' if ns != nshdr: try: dflt = self.site.family.namespace('_default', id) except KeyError: dflt = u'' if not ns and not dflt: flag = u"is not set, but should be '%s'" % nshdr elif dflt == ns: flag = u"is set to default ('%s'), but should be '%s'" % (ns, nshdr) elif dflt == nshdr: flag = u"is '%s', but should be removed (default value '%s')" % (ns, nshdr) else: flag = u"is '%s', but should be '%s'" % (ns, nshdr) output(u"WARNING: Outdated family file %s: namespace['%s'][%i] %s" % (self.site.family.name, lang, id, flag)) # self.site.family.namespaces[id][lang] = nshdr else: output(u"WARNING: Missing namespace in family file %s: namespace['%s'][%i] (it is set to '%s')" % (self.site.family.name, lang, id, nshdr)) for id in self.site.family.namespaces: if self.site.family.isDefinedNSLanguage(id, lang) and id not in header.namespaces: output(u"WARNING: Family file %s includes namespace['%s'][%i], but it should be removed (namespace doesn't exist in the site)" % (self.site.family.name, lang, id)) def getData(self): address = self.site.export_address() pagenames = [page.sectionFreeTitle() for page in self.pages] # We need to use X convention for requested page titles. if self.site.lang == 'eo': pagenames = [encodeEsperantoX(pagetitle) for pagetitle in pagenames] pagenames = u'\r\n'.join(pagenames) if type(pagenames) is not unicode: output(u'Warning: xmlreader.WikipediaXMLHandler.getData() got non-unicode page names. Please report this.') print pagenames # convert Unicode string to the encoding used on that wiki pagenames = pagenames.encode(self.site.encoding()) predata = { 'action': 'submit', 'pages': pagenames, 'curonly': 'True', } # Slow ourselves down get_throttle(requestsize = len(self.pages)) # Now make the actual request to the server now = time.time() if self.site.hostname() in config.authenticate.keys(): predata["Content-type"] = "application/x-www-form-urlencoded" predata["User-agent"] = useragent data = self.site.urlEncode(predata) response = urllib2.urlopen(urllib2.Request(self.site.protocol() + '://' + self.site.hostname() + address, data)) data = response.read() else: response, data = self.site.postForm(address, predata) # The XML parser doesn't expect a Unicode string, but an encoded one, # so we'll encode it back. data = data.encode(self.site.encoding()) # get_throttle.setDelay(time.time() - now) return data def getall(site, pages, throttle=True, force=False): """Use Special:Export to bulk-retrieve a group of pages from site Arguments: site = Site object pages = iterable that yields Page objects """ # TODO: why isn't this a Site method? pages = list(pages) # if pages is an iterator, we need to make it a list output(u'Getting %d pages from %s...' % (len(pages), site)) _GetAll(site, pages, throttle, force).run() # Library functions def unescape(s): """Replace escaped HTML-special characters by their originals""" if '&' not in s: return s s = s.replace("<", "<") s = s.replace(">", ">") s = s.replace("'", "'") s = s.replace(""", '"') s = s.replace("&", "&") # Must be last return s def setAction(s): """Set a summary to use for changed page submissions""" global action action = s # Default action setAction('Wikipedia python library') def setUserAgent(s): """Set a User-agent: header passed to the HTTP server""" global useragent useragent = s # Default User-agent setUserAgent('PythonWikipediaBot/1.0') # Mechanics to slow down page download rate. class Throttle(object): """For internal use only - control rate of access to wiki server Calling this object blocks the calling thread until at least 'delay' seconds have passed since the previous call. The framework initiates two Throttle objects: get_throttle to control the rate of read access, and put_throttle to control the rate of write access. """ def __init__(self, mindelay=config.minthrottle, maxdelay=config.maxthrottle, multiplydelay=True): self.lock = threading.RLock() self.mindelay = mindelay self.maxdelay = maxdelay self.pid = False # If self.pid remains False, we're not checking for multiple processes self.now = 0 self.next_multiplicity = 1.0 self.checkdelay = 240 # Check the file with processes again after this many seconds self.dropdelay = 360 # Drop processes from the list that have not made a check in this many seconds self.releasepid = 100000 # Free the process id self.lastwait = 0.0 self.delay = 0 if multiplydelay: self.checkMultiplicity() self.setDelay(mindelay) def logfn(self): return config.datafilepath('logs', 'throttle.log') def checkMultiplicity(self): self.lock.acquire() try: processes = {} my_pid = 1 count = 1 try: f = open(self.logfn(), 'r') except IOError: if not self.pid: pass else: raise else: now = time.time() for line in f.readlines(): try: line = line.split(' ') pid = int(line[0]) ptime = int(line[1].split('.')[0]) if now - ptime <= self.releasepid: if now - ptime <= self.dropdelay and pid != self.pid: count += 1 processes[pid] = ptime if pid >= my_pid: my_pid = pid+1 except (IndexError,ValueError): pass # Sometimes the file gets corrupted - ignore that line if not self.pid: self.pid = my_pid self.checktime = time.time() processes[self.pid] = self.checktime f = open(self.logfn(), 'w') for p in processes: f.write(str(p)+' '+str(processes[p])+'\n') f.close() self.process_multiplicity = count if verbose: output(u"Checked for running processes. %s processes currently running, including the current process." % count) finally: self.lock.release() def setDelay(self, delay = config.minthrottle, absolute = False): self.lock.acquire() try: if absolute: self.maxdelay = delay self.mindelay = delay self.delay = delay # Don't count the time we already waited as part of our waiting time :-0 self.now = time.time() finally: self.lock.release() def getDelay(self): thisdelay = self.delay if self.pid: # If self.pid, we're checking for multiple processes if time.time() > self.checktime + self.checkdelay: self.checkMultiplicity() if thisdelay < (self.mindelay * self.next_multiplicity): thisdelay = self.mindelay * self.next_multiplicity elif thisdelay > self.maxdelay: thisdelay = self.maxdelay thisdelay *= self.process_multiplicity return thisdelay def waittime(self): """Calculate the time in seconds we will have to wait if a query would be made right now""" # Take the previous requestsize in account calculating the desired # delay this time thisdelay = self.getDelay() now = time.time() ago = now - self.now if ago < thisdelay: delta = thisdelay - ago return delta else: return 0.0 def drop(self): """Remove me from the list of running bots processes.""" self.checktime = 0 processes = {} try: f = open(self.logfn(), 'r') except IOError: return else: now = time.time() for line in f.readlines(): try: line = line.split(' ') pid = int(line[0]) ptime = int(line[1].split('.')[0]) if now - ptime <= self.releasepid and pid != self.pid: processes[pid] = ptime except (IndexError,ValueError): pass # Sometimes the file gets corrupted - ignore that line f = open(self.logfn(), 'w') for p in processes: f.write(str(p)+' '+str(processes[p])+'\n') f.close() def __call__(self, requestsize=1): """ Block the calling program if the throttle time has not expired. Parameter requestsize is the number of Pages to be read/written; multiply delay time by an appropriate factor. """ self.lock.acquire() try: waittime = self.waittime() # Calculate the multiplicity of the next delay based on how # big the request is that is being posted now. # We want to add "one delay" for each factor of two in the # size of the request. Getting 64 pages at once allows 6 times # the delay time for the server. self.next_multiplicity = math.log(1+requestsize)/math.log(2.0) # Announce the delay if it exceeds a preset limit if waittime > config.noisysleep: output(u"Sleeping for %.1f seconds, %s" % (waittime, time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()))) time.sleep(waittime) self.now = time.time() finally: self.lock.release() # functions to manipulate wikitext strings (by default, all text arguments # should be Unicode) # All return the modified text as a unicode object def replaceExcept(text, old, new, exceptions, caseInsensitive=False, allowoverlap=False, marker = '', site = None): """ Return text with 'old' replaced by 'new', ignoring specified types of text. Skips occurences of 'old' within exceptions; e.g., within nowiki tags or HTML comments. If caseInsensitive is true, then use case insensitive regex matching. If allowoverlap is true, overlapping occurences are all replaced (watch out when using this, it might lead to infinite loops!). Parameters: text - a unicode string old - a compiled regular expression new - a unicode string (which can contain regular expression references), or a function which takes a match object as parameter. See parameter repl of re.sub(). exceptions - a list of strings which signal what to leave out, e.g. ['math', 'table', 'template'] caseInsensitive - a boolean marker - a string that will be added to the last replacement; if nothing is changed, it is added at the end """ # Hyperlink regex is defined in weblinkchecker.py import weblinkchecker if site is None: site = getSite() exceptionRegexes = { 'comment': re.compile(r'(?s)<!--.*?-->'), # section headers 'header': re.compile(r'\r\n=+.+=+ *\r\n'), 'includeonly': re.compile(r'(?is)<includeonly>.*?</includeonly>'), 'math': re.compile(r'(?is)<math>.*?</math>'), 'noinclude': re.compile(r'(?is)<noinclude>.*?</noinclude>'), # wiki tags are ignored inside nowiki tags. 'nowiki': re.compile(r'(?is).*?'), # preformatted text 'pre': re.compile(r'(?ism)<pre>.*?
'),
'source': re.compile(r'(?is)<source .*?</source>'), # inline references 'ref': re.compile(r'(?ism)<ref[ >].*?</ref>'), 'timeline': re.compile(r'(?is)<timeline>.*?</timeline>'), # lines that start with a space are shown in a monospace font and # have whitespace preserved. 'startspace': re.compile(r'(?m)^ (.*?)$'), # tables often have whitespace that is used to improve wiki # source code readability. # TODO: handle nested tables.
'table': re.compile(r'(?ims)^{\|.*?^\|}|
.*?'),
# templates with parameters often have whitespace that is used to # improve wiki source code readability. # 'template': re.compile(r'(?s)Template:.*?'), # The regex above fails on nested templates. This regex can handle # templates cascaded up to level 3, but no deeper. For arbitrary # depth, we'd need recursion which can't be done in Python's re. # After all, the language of correct parenthesis words is not regular. 'template': re.compile(r'(?s){{(({{((Template:.*?)|.)*}})|.)*}}'), 'hyperlink': weblinkchecker.compileLinkR(), 'gallery': re.compile(r'(?is)<gallery.*?>.*?</gallery>'), # this matches internal wikilinks, but also interwiki, categories, and # images. 'link': re.compile(r'\[\[[^\]\|]*(\|[^\]]*)?\]\]'), 'interwiki': re.compile(r'(?i)\[\[(%s)\s?:[^\]]*\]\][\s]*' % '|'.join(site.validLanguageLinks() + site.family.obsolete.keys())),
}
# if we got a string, compile it as a regular expression if type(old) is str or type(old) is unicode: if caseInsensitive: old = re.compile(old, re.IGNORECASE | re.UNICODE) else: old = re.compile(old)
dontTouchRegexes = [] for exc in exceptions: if isinstance(exc, str) or isinstance(exc, unicode): # assume it's a reference to the exceptionRegexes dictionary # defined above. if exc not in exceptionRegexes: raise ValueError("Unknown tag type: " + exc) dontTouchRegexes.append(exceptionRegexes[exc]) else: # assume it's a regular expression dontTouchRegexes.append(exc) index = 0 markerpos = len(text) while True: match = old.search(text, index) if not match: # nothing left to replace break
# check which exception will occur next. nextExceptionMatch = None for dontTouchR in dontTouchRegexes: excMatch = dontTouchR.search(text, index) if excMatch and ( nextExceptionMatch is None or excMatch.start() < nextExceptionMatch.start()): nextExceptionMatch = excMatch
if nextExceptionMatch is not None and nextExceptionMatch.start() <= match.start(): # an HTML comment or text in nowiki tags stands before the next valid match. Skip. index = nextExceptionMatch.end() else: # We found a valid match. Replace it. if callable(new): # the parameter new can be a function which takes the match as a parameter. replacement = new(match) else: # it is not a function, but a string.
# it is a little hack to make \n work. It would be better to fix it # previously, but better than nothing. new = new.replace('\\n', '\n')
# We cannot just insert the new string, as it may contain regex # group references such as \2 or \g<name>. # On the other hand, this approach does not work because it can't # handle lookahead or lookbehind (see bug #1731008): #replacement = old.sub(new, text[match.start():match.end()]) #text = text[:match.start()] + replacement + text[match.end():]
# So we have to process the group references manually. replacement = new
groupR = re.compile(r'\\(?P<number>\d+)|\\g<(?P<name>.+?)>') while True: groupMatch = groupR.search(replacement) if not groupMatch: break groupID = groupMatch.group('name') or int(groupMatch.group('number')) replacement = replacement[:groupMatch.start()] + match.group(groupID) + replacement[groupMatch.end():] text = text[:match.start()] + replacement + text[match.end():]
# continue the search on the remaining text if allowoverlap: index = match.start() + 1 else: index = match.start() + len(replacement) markerpos = match.start() + len(replacement) text = text[:markerpos] + marker + text[markerpos:] return text
def removeDisabledParts(text, tags = ['*']):
""" Return text without portions where wiki markup is disabled
Parts that can/will be removed are -- * HTML comments * nowiki tags * pre tags * includeonly tags
The exact set of parts which should be removed can be passed as the 'parts' parameter, which defaults to all. """ regexes = { 'comments' : r, 'includeonly': r, 'nowiki': r'.*?',
'pre': r'
.*?
',
'source': r'<source .*?</source>', } if '*' in tags: tags = regexes.keys() toRemoveR = re.compile('|'.join([regexes[tag] for tag in tags]), re.IGNORECASE | re.DOTALL) return toRemoveR.sub(, text)
def isDisabled(text, index, tags = ['*']):
""" Return True if text[index] is disabled, e.g. by a comment or by nowiki tags.
For the tags parameter, see removeDisabledParts() above. """ # Find a marker that is not already in the text. marker = findmarker(text, '@@', '@') text = text[:index] + marker + text[index:] text = removeDisabledParts(text, tags) return (marker not in text)
def findmarker(text, startwith = u'@', append = u'@'):
# find a string which is not part of text if len(append) <= 0: append = u'@' mymarker = startwith while mymarker in text: mymarker += append return mymarker
def expandmarker(text, marker = , separator = ):
# set to remove any number of separator occurrences plus arbitrary # whitespace before, after, and between them, # by allowing to include them into marker. if separator: firstinmarker = text.find(marker) firstinseparator = firstinmarker lenseparator = len(separator) striploopcontinue = True while firstinseparator > 0 and striploopcontinue: striploopcontinue = False if (firstinseparator >= lenseparator) and (separator == text[firstinseparator-lenseparator:firstinseparator]): firstinseparator -= lenseparator striploopcontinue = True elif text[firstinseparator-1] < ' ': firstinseparator -= 1 striploopcontinue = True marker = text[firstinseparator:firstinmarker] + marker return marker
- Part of library dealing with interwiki language links
- Note - MediaWiki supports two kinds of interwiki links; interlanguage and
- interproject. These functions only deal with links to a
- corresponding page in another language on the same project (e.g.,
- Wikipedia, Wiktionary, etc.) in another language. They do not find
- or change links to a different project, or any that are formatted
- as in-line interwiki links (e.g., "es:Articulo". (CONFIRM)
def getLanguageLinks(text, insite = None, pageLink = "[[]]"):
""" Return a dict of interlanguage links found in text.
Dict uses language codes as keys and Page objects as values. Do not call this routine directly, use Page.interwiki() method instead.
""" if insite is None: insite = getSite() result = {} # Ignore interwiki links within nowiki tags, includeonly tags, pre tags, # and HTML comments text = removeDisabledParts(text)
# This regular expression will find every link that is possibly an # interwiki link. # NOTE: language codes are case-insensitive and only consist of basic latin # letters and hyphens. interwikiR = re.compile(r'\[\[([a-zA-Z\-]+)\s?:([^\[\]\n]*)\]\]') for lang, pagetitle in interwikiR.findall(text): lang = lang.lower() # Check if it really is in fact an interwiki link to a known # language, or if it's e.g. a category tag or an internal link if lang in insite.family.obsolete: lang = insite.family.obsolete[lang] if lang in insite.validLanguageLinks(): if '|' in pagetitle: # ignore text after the pipe pagetitle = pagetitle[:pagetitle.index('|')] # we want the actual page objects rather than the titles site = insite.getSite(code = lang) try: result[site] = Page(site, pagetitle, insite = insite) except InvalidTitle: output( u"[getLanguageLinks] Text contains invalid interwiki link %s:%s." % (lang, pagetitle)) continue return result
def removeLanguageLinks(text, site = None, marker = ):
"""Return text with all interlanguage links removed.
If a link to an unknown language is encountered, a warning is printed. If a marker is defined, that string is placed at the location of the last occurence of an interwiki link (at the end if there are no interwiki links).
""" if site is None: site = getSite() if not site.validLanguageLinks(): return text # This regular expression will find every interwiki link, plus trailing # whitespace. languages = '|'.join(site.validLanguageLinks() + site.family.obsolete.keys()) interwikiR = re.compile(r'\[\[(%s)\s?:[^\]]*\]\][\s]*' % languages, re.IGNORECASE) text = replaceExcept(text, interwikiR, , ['nowiki', 'comment', 'math', 'pre', 'source'], marker=marker) return text.strip()
def removeLanguageLinksAndSeparator(text, site = None, marker = , separator = ):
"""Return text with all interlanguage links, plus any preceeding whitespace and separateor occurrences removed.
If a link to an unknown language is encountered, a warning is printed. If a marker is defined, that string is placed at the location of the last occurence of an interwiki link (at the end if there are no interwiki links).
""" if separator: mymarker = findmarker(text, u'@L@') newtext = removeLanguageLinks(text, site, mymarker) mymarker = expandmarker(newtext, mymarker, separator) return newtext.replace(mymarker, marker) else: return removeLanguageLinks(text, site, marker)
def replaceLanguageLinks(oldtext, new, site = None, addOnly = False, template = False):
"""Replace interlanguage links in the text with a new set of links.
'new' should be a dict with the Site objects as keys, and Page objects as values (i.e., just like the dict returned by getLanguageLinks function). """ # Find a marker that is not already in the text. marker = findmarker( oldtext, u'@@') if site is None: site = getSite() separator = site.family.interwiki_text_separator cseparator = site.family.category_text_separator separatorstripped = separator.strip() cseparatorstripped = cseparator.strip() if addOnly: s2 = oldtext else: s2 = removeLanguageLinksAndSeparator(oldtext, site = site, marker = marker, separator = separatorstripped) s = interwikiFormat(new, insite = site) if s: if site.language() in site.family.interwiki_attop: newtext = s + separator + s2.replace(marker,).strip() else: # calculate what was after the language links on the page firstafter = s2.find(marker) if firstafter < 0: firstafter = len(s2) else: firstafter += len(marker) # Is there any text in the 'after' part that means we should keep it after? if "" in s2[firstafter:]: if separatorstripped: s = separator + s newtext = s2[:firstafter].replace(marker,) + s + s2[firstafter:] elif site.language() in site.family.categories_last: cats = getCategoryLinks(s2, site = site) s2 = removeCategoryLinksAndSeparator(s2.replace(marker,,cseparatorstripped).strip(), site) + separator + s newtext = replaceCategoryLinks(s2, cats, site=site, addOnly=True) else: if template: # Do we have a noinclude at the end of the template? parts = s2.split() lastpart = parts[-1] if re.match('\s*%s' % marker, lastpart): # Put the langlinks back into the noinclude's regexp = re.compile('\s*%s' % marker) newtext = regexp.sub(s + , s2) else: # Put the langlinks at the end, inside noinclude's newtext = s2.replace(marker,).strip() + separator + u'\n%s\n' % s else: newtext = s2.replace(marker,).strip() + separator + s else: newtext = s2.replace(marker,) return newtext
def interwikiFormat(links, insite = None):
"""Convert interwiki link dict into a wikitext string.
'links' should be a dict with the Site objects as keys, and Page objects as values.
Return a unicode string that is formatted for inclusion in insite (defaulting to the current site). """ if insite is None: insite = getSite() if not links: return
ar = interwikiSort(links.keys(), insite) s = [] for site in ar: try: link = links[site].aslink(forceInterwiki=True).replace('[[:', '[[') s.append(link) except AttributeError: s.append(getSite(site).linkto(links[site], othersite=insite)) if insite.lang in insite.family.interwiki_on_one_line: sep = u' ' else: sep = u'\r\n' s=sep.join(s) + u'\r\n' return s
- Sort sites according to local interwiki sort logic
def interwikiSort(sites, insite = None):
if insite is None: insite = getSite() if not sites: return []
sites.sort() putfirst = insite.interwiki_putfirst() if putfirst: #In this case I might have to change the order firstsites = [] for code in putfirst: # The code may not exist in this family? if code in insite.family.obsolete: code = insite.family.obsolete[code] if code in insite.validLanguageLinks(): site = insite.getSite(code = code) if site in sites: del sites[sites.index(site)] firstsites = firstsites + [site] sites = firstsites + sites if insite.interwiki_putfirst_doubled(sites): #some implementations return False sites = insite.interwiki_putfirst_doubled(sites) + sites
return sites
- Wikitext manipulation functions dealing with category links
def getCategoryLinks(text, site):
import catlib """Return a list of category links found in text.
List contains Category objects. Do not call this routine directly, use Page.categories() instead.
""" result = [] # Ignore category links within nowiki tags, pre tags, includeonly tags, # and HTML comments text = removeDisabledParts(text) catNamespace = '|'.join(site.category_namespaces()) R = re.compile(r'\[\[\s*(?P<namespace>%s)\s*:\s*(?P<catName>.+?)(?:\|(?P<sortKey>.+?))?\s*\]\]' % catNamespace, re.I) for match in R.finditer(text): cat = catlib.Category(site, '%s:%s' % (match.group('namespace'), match.group('catName')), sortKey = match.group('sortKey')) result.append(cat) return result
def removeCategoryLinks(text, site, marker = ):
"""Return text with all category links removed.
Put the string marker after the last replacement (at the end of the text if there is no replacement).
""" # This regular expression will find every link that is possibly an # interwiki link, plus trailing whitespace. The language code is grouped. # NOTE: This assumes that language codes only consist of non-capital # ASCII letters and hyphens. catNamespace = '|'.join(site.category_namespaces()) categoryR = re.compile(r'\[\[\s*(%s)\s*:.*?\]\]\s*' % catNamespace, re.I) text = replaceExcept(text, categoryR, , ['nowiki', 'comment', 'math', 'pre', 'source'], marker = marker) if marker: #avoid having multiple linefeeds at the end of the text text = re.sub('\s*%s' % re.escape(marker), '\r\n' + marker, text.strip()) return text.strip()
def removeCategoryLinksAndSeparator(text, site = None, marker = , separator = ):
"""Return text with all category links, plus any preceeding whitespace and separateor occurrences removed.
Put the string marker after the last replacement (at the end of the text if there is no replacement).
""" if separator: mymarker = findmarker(text, u'@C@') newtext = removeCategoryLinks(text, site, mymarker) mymarker = expandmarker(newtext, mymarker, separator) return newtext.replace(mymarker, marker) else: return removeCategoryLinks(text, site, marker)
def replaceCategoryInPlace(oldtext, oldcat, newcat, site=None):
"""Replace the category oldcat with the category newcat and return the modified text.
""" if site is None: site = getSite()
catNamespace = '|'.join(site.category_namespaces()) title = oldcat.titleWithoutNamespace() if not title: return # title might contain regex special characters title = re.escape(title) # title might not be capitalized correctly on the wiki if title[0].isalpha() and not site.nocapitalize: title = "[%s%s]" % (title[0].upper(), title[0].lower()) + title[1:] # spaces and underscores in page titles are interchangeable, and collapsible title = title.replace(r"\ ", "[ _]+").replace(r"\_", "[ _]+") categoryR = re.compile(r'\[\[\s*(%s)\s*:\s*%s\s*((?:\|[^]]+)?\]\])' % (catNamespace, title), re.I) if newcat is None: text = replaceExcept(oldtext, categoryR, , ['nowiki', 'comment', 'math', 'pre', 'source']) else: text = replaceExcept(oldtext, categoryR, '[[%s:%s\\2' % (site.namespace(14), newcat.titleWithoutNamespace()), ['nowiki', 'comment', 'math', 'pre', 'source']) return text
def replaceCategoryLinks(oldtext, new, site = None, addOnly = False):
"""Replace the category links given in the wikitext given in oldtext by the new links given in new.
'new' should be a list of Category objects.
If addOnly is True, the old category won't be deleted and the category(s) given will be added (and so they won't replace anything). """
# Find a marker that is not already in the text. marker = findmarker( oldtext, u'@@') if site is None: site = getSite() if site.sitename() == 'wikipedia:de' and "{{Personendaten" in oldtext: raise Error('The PyWikipediaBot is no longer allowed to touch categories on the German Wikipedia on pages that contain the person data template because of the non-standard placement of that template. See http://de.wikipedia.org/wiki/Hilfe_Diskussion:Personendaten/Archiv/bis_2006#Position_der_Personendaten_am_.22Artikelende.22') separator = site.family.category_text_separator iseparator = site.family.interwiki_text_separator separatorstripped = separator.strip() iseparatorstripped = iseparator.strip() if addOnly: s2 = oldtext else: s2 = removeCategoryLinksAndSeparator(oldtext, site = site, marker = marker, separator = separatorstripped) s = categoryFormat(new, insite = site) if s: if site.language() in site.family.category_attop: newtext = s + separator + s2 else: # calculate what was after the categories links on the page firstafter = s2.find(marker) if firstafter < 0: firstafter = len(s2) else: firstafter += len(marker) # Is there any text in the 'after' part that means we should keep it after? if "" in s2[firstafter:]: if separatorstripped: s = separator + s newtext = s2[:firstafter].replace(marker,) + s + s2[firstafter:] elif site.language() in site.family.categories_last: newtext = s2.replace(marker,).strip() + separator + s else: interwiki = getLanguageLinks(s2) s2 = removeLanguageLinksAndSeparator(s2.replace(marker,), site, , iseparatorstripped) + separator + s newtext = replaceLanguageLinks(s2, interwiki, site = site, addOnly = True) else: newtext = s2.replace(marker,) return newtext.strip()
def categoryFormat(categories, insite = None):
"""Return a string containing links to all categories in a list.
'categories' should be a list of Category objects.
The string is formatted for inclusion in insite. """ if not categories: return if insite is None: insite = getSite() catLinks = [category.aslink(noInterwiki = True) for category in categories] if insite.category_on_one_line(): sep = ' ' else: sep = '\r\n' # Some people don't like the categories sorted #catLinks.sort() return sep.join(catLinks) + '\r\n'
- end of category specific code
def url2link(percentname, insite, site):
"""Convert urlname of a wiki page into interwiki link format.
'percentname' is the page title as given by Page.urlname(); 'insite' specifies the target Site; 'site' is the Site on which the page is found.
""" # Note: this is only needed if linking between wikis that use different # encodings, so it is now largely obsolete. [CONFIRM] percentname = percentname.replace('_', ' ') x = url2unicode(percentname, site = site) return unicode2html(x, insite.encoding())
def decodeEsperantoX(text):
""" Decode Esperanto text encoded using the x convention.
E.g., Cxefpagxo and CXefpagXo will both be converted to Ĉefpaĝo. Note that to encode non-Esperanto words like Bordeaux, one uses a double x, i.e. Bordeauxx or BordeauxX.
""" chars = { u'c': u'ĉ', u'C': u'Ĉ', u'g': u'ĝ', u'G': u'Ĝ', u'h': u'ĥ', u'H': u'Ĥ', u'j': u'ĵ', u'J': u'Ĵ', u's': u'ŝ', u'S': u'Ŝ', u'u': u'ŭ', u'U': u'Ŭ', } for latin, esperanto in chars.iteritems(): # A regular expression that matches a letter combination which IS # encoded using x-convention. xConvR = re.compile(latin + '[xX]+') pos = 0 result = # Each matching substring will be regarded exactly once. while True: match = xConvR.search(text[pos:]) if match: old = match.group() if len(old) % 2 == 0: # The first two chars represent an Esperanto letter. # Following x's are doubled. new = esperanto + .join([old[2 * i] for i in range(1, len(old)/2)]) else: # The first character stays latin; only the x's are doubled. new = latin + .join([old[2 * i + 1] for i in range(0, len(old)/2)]) result += text[pos : match.start() + pos] + new pos += match.start() + len(old) else: result += text[pos:] text = result break return text
def encodeEsperantoX(text):
""" Convert standard wikitext to the Esperanto x-encoding.
Double X-es where necessary so that we can submit a page to an Esperanto wiki. Again, we have to keep stupid stuff like cXxXxxX in mind. Maybe someone wants to write about the Sony Cyber-shot DSC-Uxx camera series on eo: ;) """ # A regular expression that matches a letter combination which is NOT # encoded in x-convention. notXConvR = re.compile('[cghjsuCGHJSU][xX]+') pos = 0 result = while True: match = notXConvR.search(text[pos:]) if match: old = match.group() # the first letter stays; add an x after each X or x. new = old[0] + .join([old[i] + 'x' for i in range(1, len(old))]) result += text[pos : match.start() + pos] + new pos += match.start() + len(old) else: result += text[pos:] text = result break return text
def sectionencode(text, encoding):
"""Encode text so that it can be used as a section title in wiki-links.""" return urllib.quote(text.replace(" ","_").encode(encoding)).replace("%",".")
- Unicode library functions ########
def UnicodeToAsciiHtml(s):
"""Convert unicode to a bytestring using HTML entities.""" html = [] for c in s: cord = ord(c) if 31 < cord < 128: html.append(c) else: html.append('&#%d;'%cord) return .join(html)
def url2unicode(title, site, site2 = None):
"""Convert url-encoded text to unicode using site's encoding.
If site2 is provided, try its encodings as well. Uses the first encoding that doesn't cause an error.
""" # create a list of all possible encodings for both hint sites encList = [site.encoding()] + list(site.encodings()) if site2 and site2 <> site: encList.append(site2.encoding()) encList += list(site2.encodings()) firstException = None # try to handle all encodings (will probably retry utf-8) for enc in encList: try: t = title.encode(enc) t = urllib.unquote(t) return unicode(t, enc) except UnicodeError, ex: if not firstException: firstException = ex pass # Couldn't convert, raise the original exception raise firstException
def unicode2html(x, encoding):
""" Ensure unicode string is encodable, or else convert to ASCII for HTML.
Arguments are a unicode string and an encoding. Attempt to encode the string into the desired format; if that doesn't work, encode the unicode into html &#; entities. If it does work, return it unchanged.
""" try: x.encode(encoding) except UnicodeError: x = UnicodeToAsciiHtml(x) return x
def html2unicode(text, ignore = []):
"""Replace all HTML entities in text by equivalent unicode characters.""" # This regular expression will match any decimal and hexadecimal entity and # also entities that might be named entities. entityR = re.compile(r'&(#(?P<decimal>\d+)|#x(?P<hex>[0-9a-fA-F]+)|(?P<name>[A-Za-z]+));') #These characters are Html-illegal, but sadly you *can* find some of these and #converting them to unichr(decimal) is unsuitable convertIllegalHtmlEntities = { 128 : 8364, # € 130 : 8218, # ‚ 131 : 402, # ƒ 132 : 8222, # „ 133 : 8230, # … 134 : 8224, # † 135 : 8225, # ‡ 136 : 710, # ˆ 137 : 8240, # ‰ 138 : 352, # Š 139 : 8249, # ‹ 140 : 338, # Œ 142 : 381, # Ž 145 : 8216, # ‘ 146 : 8217, # ’ 147 : 8220, # “ 148 : 8221, # ” 149 : 8226, # • 150 : 8211, # – 151 : 8212, # — 152 : 732, # ˜ 153 : 8482, # ™ 154 : 353, # š 155 : 8250, # › 156 : 339, # œ 158 : 382, # ž 159 : 376 # Ÿ } #ensuring that illegal   and , which have no known values, #don't get converted to unichr(129), unichr(141) or unichr(157) ignore = set(ignore) | set([129, 141, 157]) result = u i = 0 found = True while found: text = text[i:] match = entityR.search(text) if match: unicodeCodepoint = None if match.group('decimal'): unicodeCodepoint = int(match.group('decimal')) elif match.group('hex'): unicodeCodepoint = int(match.group('hex'), 16) elif match.group('name'): name = match.group('name') if name in htmlentitydefs.name2codepoint: # We found a known HTML entity. unicodeCodepoint = htmlentitydefs.name2codepoint[name] result += text[:match.start()] try: unicodeCodepoint=convertIllegalHtmlEntities[unicodeCodepoint] except KeyError: pass if unicodeCodepoint and unicodeCodepoint not in ignore and (WIDEBUILD or unicodeCodepoint < 65534): result += unichr(unicodeCodepoint) else: # Leave the entity unchanged result += text[match.start():match.end()] i = match.end() else: result += text found = False return result
- Warning! _familyCache does not necessarily have to be consistent between
- two statements. Always ensure that a local reference is created when
- accessing Family objects
_familyCache = weakref.WeakValueDictionary() def Family(fam = None, fatal = True, force = False):
""" Import the named family.
If fatal is True, the bot will stop running when the given family is unknown. If fatal is False, it will only raise a ValueError exception. """ if fam is None: fam = config.family
family = _familyCache.get(fam) if family and not force: return family
try: # search for family module in the 'families' subdirectory sys.path.append(config.datafilepath('families')) myfamily = __import__('%s_family' % fam) except ImportError: if fatal: output(u"""\
Error importing the %s family. This probably means the family does not exist. Also check your configuration file."""
% fam) import traceback traceback.print_stack() sys.exit(1) else: raise ValueError("Family %s does not exist" % repr(fam))
family = myfamily.Family() _familyCache[fam] = family return family
class Site(object):
"""A MediaWiki site. Do not instantiate directly; use getSite() function.
Constructor takes four arguments; only code is mandatory:
code language code for Site fam Wiki family (optional: defaults to configured). Can either be a string or a Family object. user User to use (optional: defaults to configured) persistent_http Use a persistent http connection. An http connection has to be established only once, making stuff a whole lot faster. Do NOT EVER use this if you share Site objects across threads without proper locking.
Methods:
language: This Site's language code. family: This Site's Family object. sitename: A string representing this Site. languages: A list of all languages contained in this site's Family. validLanguageLinks: A list of language codes that can be used in interwiki links.
loggedInAs: return current username, or None if not logged in. forceLogin: require the user to log in to the site messages: return True if there are new messages on the site cookies: return user's cookies as a string
getUrl: retrieve an URL from the site urlEncode: Encode a query to be sent using an http POST request. postForm: Post form data to an address at this site. postData: Post encoded form data to an http address at this site.
namespace(num): Return local name of namespace 'num'. normalizeNamespace(value): Return preferred name for namespace 'value' in this Site's language. namespaces: Return list of canonical namespace names for this Site. getNamespaceIndex(name): Return the int index of namespace 'name', or None if invalid.
redirect: Return the localized redirect tag for the site. redirectRegex: Return compiled regular expression matching on redirect pages. mediawiki_message: Retrieve the text of a specified MediaWiki message has_mediawiki_message: True if this site defines specified MediaWiki message
shared_image_repository: Return tuple of image repositories used by this site. category_on_one_line: Return True if this site wants all category links on one line. interwiki_putfirst: Return list of language codes for ordering of interwiki links. linkto(title): Return string in the form of a wikilink to 'title' isInterwikiLink(s): Return True if 's' is in the form of an interwiki link. getSite(lang): Return Site object for wiki in same family, language 'lang'. version: Return MediaWiki version string from Family file. versionnumber: Return int identifying the MediaWiki version. live_version: Return version number read from Special:Version. checkCharset(charset): Warn if charset doesn't match family file. server_time : returns server time (currently userclock depending)
linktrail: Return regex for trailing chars displayed as part of a link. disambcategory: Category in which disambiguation pages are listed.
Methods that yield Page objects derived from a wiki's Special: pages (note, some methods yield other information in a tuple along with the Pages; see method docs for details) --
search(query): query results from Special:Search allpages(): Special:Allpages prefixindex(): Special:Prefixindex protectedpages(): Special:ProtectedPages newpages(): Special:Newpages newimages(): Special:Log&type=upload longpages(): Special:Longpages shortpages(): Special:Shortpages categories(): Special:Categories (yields Category objects) deadendpages(): Special:Deadendpages ancientpages(): Special:Ancientpages lonelypages(): Special:Lonelypages recentchanges(): Special:Recentchanges unwatchedpages(): Special:Unwatchedpages (sysop accounts only) uncategorizedcategories(): Special:Uncategorizedcategories (yields Category objects) uncategorizedpages(): Special:Uncategorizedpages uncategorizedimages(): Special:Uncategorizedimages (yields ImagePage objects) unusedcategories(): Special:Unusuedcategories (yields Category) unusedfiles(): Special:Unusedimages (yields ImagePage) randompage: Special:Random randomredirectpage: Special:RandomRedirect withoutinterwiki: Special:Withoutinterwiki linksearch: Special:Linksearch
Convenience methods that provide access to properties of the wiki Family object; all of these are read-only and return a unicode string unless noted --
encoding: The current encoding for this site. encodings: List of all historical encodings for this site. category_namespace: Canonical name of the Category namespace on this site. category_namespaces: List of all valid names for the Category namespace. image_namespace: Canonical name of the Image namespace on this site. template_namespace: Canonical name of the Template namespace on this site. protocol: Protocol ('http' or 'https') for access to this site. hostname: Host portion of site URL. path: URL path for index.php on this Site. dbName: MySQL database name.
Methods that return addresses to pages on this site (usually in Special: namespace); these methods only return URL paths, they do not interact with the wiki --
export_address: Special:Export. query_address: URL path + '?' for query.php api_address: URL path + '?' for api.php apipath: URL path for api.php move_address: Special:Movepage. delete_address(s): Delete title 's'. undelete_view_address(s): Special:Undelete for title 's' undelete_address: Special:Undelete. protect_address(s): Protect title 's'. unprotect_address(s): Unprotect title 's'. put_address(s): Submit revision to page titled 's'. get_address(s): Retrieve page titled 's'. nice_get_address(s): Short URL path to retrieve page titled 's'. edit_address(s): Edit form for page titled 's'. purge_address(s): Purge cache and retrieve page 's'. block_address: Block an IP address. unblock_address: Unblock an IP address. blocksearch_address(s): Search for blocks on IP address 's'. linksearch_address(s): Special:Linksearch for target 's'. search_address(q): Special:Search for query 'q'. allpages_address(s): Special:Allpages. newpages_address: Special:Newpages. longpages_address: Special:Longpages. shortpages_address: Special:Shortpages. unusedfiles_address: Special:Unusedimages. categories_address: Special:Categories. deadendpages_address: Special:Deadendpages. ancientpages_address: Special:Ancientpages. lonelypages_address: Special:Lonelypages. protectedpages_address: Special:ProtectedPages unwatchedpages_address: Special:Unwatchedpages. uncategorizedcategories_address: Special:Uncategorizedcategories. uncategorizedimages_address: Special:Uncategorizedimages. uncategorizedpages_address: Special:Uncategorizedpages. unusedcategories_address: Special:Unusedcategories. withoutinterwiki_address: Special:Withoutinterwiki. references_address(s): Special:Whatlinksere for page 's'. allmessages_address: Special:Allmessages. upload_address: Special:Upload. double_redirects_address: Special:Doubleredirects. broken_redirects_address: Special:Brokenredirects. random_address: Special:Random. randomredirect_address: Special:Random. login_address: Special:Userlogin. captcha_image_address(id): Special:Captcha for image 'id'. watchlist_address: Special:Watchlist editor. contribs_address(target): Special:Contributions for user 'target'.
""" def __init__(self, code, fam=None, user=None, persistent_http = None): self.lang = code.lower() if isinstance(fam, basestring) or fam is None: self.family = Family(fam, fatal = False) else: self.family = fam
# if we got an outdated language code, use the new one instead. if self.lang in self.family.obsolete: if self.family.obsolete[self.lang] is not None: self.lang = self.family.obsolete[self.lang] else: # no such language anymore raise NoSuchSite("Language %s in family %s is obsolete" % (self.lang, self.family.name))
if self.lang not in self.languages(): if self.lang == 'zh-classic' and 'zh-classical' in self.languages(): self.lang = 'zh-classical' # ev0l database hack (database is varchar[10] -> zh-classical is cut to zh-classic. elif self.family.name in self.family.langs.keys() or len(self.family.langs) == 1: self.lang = self.family.name else: raise NoSuchSite("Language %s does not exist in family %s"%(self.lang,self.family.name))
self._mediawiki_messages = {} self.nocapitalize = self.lang in self.family.nocapitalize self.user = user self._userData = [False, False] self._userName = [None, None] self._isLoggedIn = [None, None] self._isBlocked = [None, None] self._messages = [None, None] self._rights = [None, None] self._token = [None, None] self._cookies = [None, None] # Calculating valid languages took quite long, so we calculate it once # in initialization instead of each time it is used. self._validlanguages = [] for language in self.languages(): if not language[0].upper() + language[1:] in self.namespaces(): self._validlanguages.append(language)
#if persistent_http is None: # persistent_http = config.persistent_http #self.persistent_http = persistent_http and self.protocol() in ('http', 'https') #if persistent_http: # if self.protocol() == 'http': # self.conn = httplib.HTTPConnection(self.hostname()) # elif self.protocol() == 'https': # self.conn = httplib.HTTPSConnection(self.hostname()) self.persistent_http = False
def _userIndex(self, sysop = False): """Returns the internal index of the user.""" if sysop: return 1 else: return 0 def username(self, sysop = False): return self._userName[self._userIndex(sysop = sysop)]
def loggedInAs(self, sysop = False): """Return the current username if logged in, otherwise return None.
Checks if we're logged in by loading a page and looking for the login link. We assume that we're not being logged out during a bot run, so loading the test page is only required once.
""" index = self._userIndex(sysop) if self._isLoggedIn[index] is None: # Load the details only if you don't know the login status. # Don't load them just because the other details aren't known. self._load(sysop = sysop) if self._isLoggedIn[index]: return self._userName[index] else: return None
def forceLogin(self, sysop = False): """Log the user in if not already logged in.""" if not self.loggedInAs(sysop = sysop): loginMan = login.LoginManager(site = self, sysop = sysop) #loginMan.logout() if loginMan.login(retry = True): index = self._userIndex(sysop) self._isLoggedIn[index] = True self._userName[index] = loginMan.username # We know nothing about the new user (but its name) # Old info is about the anonymous user self._userData[index] = False
def checkBlocks(self, sysop = False): """Check if the user is blocked, and raise an exception if so.""" self._load(sysop = sysop) index = self._userIndex(sysop) if self._isBlocked[index]: # User blocked raise UserBlocked('User is blocked in site %s' % self)
def isBlocked(self, sysop = False): """Check if the user is blocked.""" self._load(sysop = sysop) index = self._userIndex(sysop) if self._isBlocked[index]: # User blocked return True else: return False
def _getBlock(self, sysop = False): """Get user block data from the API.""" try: params = { 'action': 'query', 'meta': 'userinfo', 'uiprop': 'blockinfo', } data = query.GetData(params, self)['query']['userinfo'] return data.has_key('blockby') except NotImplementedError: return False
def isAllowed(self, right, sysop = False): """Check if the user has a specific right. Among possible rights: * Actions: edit, move, delete, protect, upload * User levels: autoconfirmed, sysop, bot, empty string (always true) """ if right == or right is None: return True else: self._load(sysop = sysop) index = self._userIndex(sysop) ##output('%s' % self._rights[index]) #for debug use return right in self._rights[index]
def server_time(self): """returns a datetime object representing server time""" # It is currently user-clock depending return self.family.server_time()
def messages(self, sysop = False): """Returns true if the user has new messages, and false otherwise.""" self._load(sysop = sysop) index = self._userIndex(sysop) return self._messages[index]
def cookies(self, sysop = False): """Return a string containing the user's current cookies.""" self._loadCookies(sysop = sysop) index = self._userIndex(sysop) return self._cookies[index]
def _loadCookies(self, sysop = False): """Retrieve session cookies for login""" index = self._userIndex(sysop) if self._cookies[index] is not None: return try: if sysop: try: username = config.sysopnames[self.family.name][self.lang] except KeyError: raise NoUsername("""\
You tried to perform an action that requires admin privileges, but you haven't entered your sysop name in your user-config.py. Please add sysopnames['%s']['%s']='name' to your user-config.py"""
% (self.family.name, self.lang)) else: username = config.usernames[self.family.name][self.lang] except KeyError: self._cookies[index] = None self._isLoggedIn[index] = False else: tmp = '%s-%s-%s-login.data' % ( self.family.name, self.lang, username) fn = config.datafilepath('login-data', tmp) if not os.path.exists(fn): self._cookies[index] = None self._isLoggedIn[index] = False else: f = open(fn) self._cookies[index] = '; '.join([x.strip() for x in f.readlines()]) f.close()
def urlEncode(self, query): """Encode a query so that it can be sent using an http POST request.""" if not query: return None if hasattr(query, 'iteritems'): iterator = query.iteritems() else: iterator = iter(query) l = [] wpEditToken = None for key, value in iterator: if isinstance(key, unicode): key = key.encode('utf-8') if isinstance(value, unicode): value = value.encode('utf-8') key = urllib.quote(key) value = urllib.quote(value) if key == 'wpEditToken': wpEditToken = value continue l.append(key + '=' + value)
# wpEditToken is explicitly added as last value. # If a premature connection abort occurs while putting, the server will # not have received an edit token and thus refuse saving the page if wpEditToken is not None: l.append('wpEditToken=' + wpEditToken) return '&'.join(l)
def solveCaptcha(self, data): if type(data) == dict: # API Mode result if data.has_key("captcha"): captype = data['captcha']['type'] id = data['captcha']['id'] if captype in ['simple', 'math', 'question']: answer = input('What is the answer to the captcha "%s" ?' % data['result']['captcha']['question']) elif captype == 'image': url = self.protocol() + '://' + self.hostname() + self.captcha_image_address(id) answer = ui.askForCaptcha(url) else: #no captcha id result, maybe ReCaptcha. raise CaptchaError('We have been prompted for a ReCaptcha, but pywikipedia does not yet support ReCaptchas') return {'id':id, 'answer':answer} return None else: captchaW = re.compile('<label for="wpCaptchaWord">(?P<question>[^<]*)</label>') captchaR = re.compile('<input type="hidden" name="wpCaptchaId" id="wpCaptchaId" value="(?P<id>\d+)" />') match = captchaR.search(data) if match: id = match.group('id') match = captchaW.search(data) if match: answer = input('What is the answer to the captcha "%s" ?' % match.group('question')) else: if not config.solve_captcha: raise CaptchaError(id) url = self.protocol() + '://' + self.hostname() + self.captcha_image_address(id) answer = ui.askForCaptcha(url) return {'id':id, 'answer':answer} Recaptcha = re.compile('<script type="text/javascript" src="http://api\.recaptcha\.net/[^"]*"></script>') if Recaptcha.search(data): raise CaptchaError('We have been prompted for a ReCaptcha, but pywikipedia does not yet support ReCaptchas') return None
def postForm(self, address, predata, sysop=False, cookies = None): """Post http form data to the given address at this site.
address - the absolute path without hostname. predata - a dict or any iterable that can be converted to a dict, containing keys and values for the http form. cookies - the cookies to send with the form. If None, send self.cookies
Return a (response, data) tuple, where response is the HTTP response object and data is a Unicode string containing the body of the response.
""" data = self.urlEncode(predata) try: if cookies: return self.postData(address, data, sysop=sysop, cookies=cookies) else: return self.postData(address, data, sysop=sysop, cookies=self.cookies(sysop = sysop)) except socket.error, e: raise ServerError(e)
def postData(self, address, data, contentType='application/x-www-form-urlencoded', sysop=False, compress=True, cookies=None): """Post encoded data to the given http address at this site.
address is the absolute path without hostname. data is an ASCII string that has been URL-encoded.
Returns a (response, data) tuple where response is the HTTP response object and data is a Unicode string containing the body of the response. """
# TODO: add the authenticate stuff here
if False: #self.persistent_http: conn = self.conn else: # Encode all of this into a HTTP request if self.protocol() == 'http': conn = httplib.HTTPConnection(self.hostname()) elif self.protocol() == 'https': conn = httplib.HTTPSConnection(self.hostname()) # otherwise, it will crash, as other protocols are not supported
conn.putrequest('POST', address) conn.putheader('Content-Length', str(len(data))) conn.putheader('Content-type', contentType) conn.putheader('User-agent', useragent) if cookies: conn.putheader('Cookie', cookies) if False: #self.persistent_http: conn.putheader('Connection', 'Keep-Alive') if compress: conn.putheader('Accept-encoding', 'gzip') conn.endheaders() conn.send(data)
# Prepare the return values # Note that this can raise network exceptions which are not # caught here. try: response = conn.getresponse() except httplib.BadStatusLine: # Blub. conn.close() conn.connect() return self.postData(address, data, contentType, sysop, compress, cookies)
data = response.read()
if compress and response.getheader('Content-Encoding') == 'gzip': data = decompress_gzip(data)
data = data.decode(self.encoding()) response.close()
if True: #not self.persistent_http: conn.close()
# If a wiki page, get user data self._getUserData(data, sysop = sysop)
return response, data
def getUrl(self, path, retry = None, sysop = False, data = None, compress = True, no_hostname = False, cookie_only=False, back_response=False): """ Low-level routine to get a URL from the wiki.
Parameters: path - The absolute path, without the hostname. retry - If True, retries loading the page when a network error occurs. sysop - If True, the sysop account's cookie will be used. data - An optional dict providing extra post request parameters. cookie_only - Only return the cookie the server sent us back
Returns the HTML text of the page converted to unicode. """
if retry is None: retry=config.retry_on_fail
if False: #self.persistent_http and not data: self.conn.putrequest('GET', path) self.conn.putheader('User-agent', useragent) self.conn.putheader('Cookie', self.cookies(sysop = sysop)) self.conn.putheader('Connection', 'Keep-Alive') if compress: self.conn.putheader('Accept-encoding', 'gzip') self.conn.endheaders()
# Prepare the return values # Note that this can raise network exceptions which are not # caught here. try: response = self.conn.getresponse() except httplib.BadStatusLine: # Blub. self.conn.close() self.conn.connect() return self.getUrl(path, retry, sysop, data, compress, back_response=back_response)
text = response.read() headers = dict(response.getheaders())
else: if self.hostname() in config.authenticate.keys(): uo = authenticateURLopener else: uo = MyURLopener() if self.cookies(sysop = sysop): uo.addheader('Cookie', self.cookies(sysop = sysop)) if compress: uo.addheader('Accept-encoding', 'gzip') if no_hostname == True: # This allow users to parse also toolserver's script url = path # and other useful pages without using some other functions. else: url = '%s://%s%s' % (self.protocol(), self.hostname(), path) data = self.urlEncode(data)
# Try to retrieve the page until it was successfully loaded (just in # case the server is down or overloaded). # Wait for retry_idle_time minutes (growing!) between retries. retry_idle_time = 1 retrieved = False while not retrieved: try: if self.hostname() in config.authenticate.keys(): request = urllib2.Request(url, data) request.add_header('User-agent', useragent) opener = urllib2.build_opener() f = opener.open(request) else: f = uo.open(url, data)
# read & info can raise socket.error text = f.read() headers = f.info()
retrieved = True except KeyboardInterrupt: raise except Exception, e: if retry: # We assume that the server is down. Wait some time, then try again. output(u"%s" % e) output(u"""\
WARNING: Could not open '%s'. Maybe the server or your connection is down. Retrying in %i minutes..."""
% (url, retry_idle_time)) time.sleep(retry_idle_time * 60) # Next time wait longer, but not longer than half an hour retry_idle_time *= 2 if retry_idle_time > 30: retry_idle_time = 30 else: raise
if cookie_only: return headers.get('set-cookie', ) contentType = headers.get('content-type', ) contentEncoding = headers.get('content-encoding', )
# Ensure that all sent data is received if int(headers.get('content-length', '0')) != len(text) and 'content-length' in headers: output(u'Warning! len(text) does not match content-length: %s != %s' % \ (len(text), headers.get('content-length'))) if False: #self.persistent_http self.conn.close() self.conn.connect() return self.getUrl(path, retry, sysop, data, compress, back_response=back_response)
if compress and contentEncoding == 'gzip': text = decompress_gzip(text)
R = re.compile('charset=([^\'\";]+)') m = R.search(contentType) if m: charset = m.group(1) else: if verbose: output(u"WARNING: No character set found.") # UTF-8 as default charset = 'utf-8' # Check if this is the charset we expected self.checkCharset(charset) # Convert HTML to Unicode try: text = unicode(text, charset, errors = 'strict') except UnicodeDecodeError, e: print e if no_hostname: output(u'ERROR: Invalid characters found on %s, replaced by \\ufffd.' % path) else: output(u'ERROR: Invalid characters found on %s://%s%s, replaced by \\ufffd.' % (self.protocol(), self.hostname(), path)) # We use error='replace' in case of bad encoding. text = unicode(text, charset, errors = 'replace')
# If a wiki page, get user data self._getUserData(text, sysop = sysop)
if back_response: return response, text else: return text
def _getUserData(self, text, sysop = False, force = True): """ Get the user data from a wiki page data.
Parameters: * text - the page text * sysop - is the user a sysop? """
index = self._userIndex(sysop)
if type(text) == dict: #text is dict, query from API # Check for blocks if text.has_key('blockedby') and not self._isBlocked[index]: # Write a warning if not shown earlier if sysop: account = 'Your sysop account' else: account = 'Your account' output(u'WARNING: %s on %s is blocked. Editing using this account will stop the run.' % (account, self)) self._isBlocked[index] = text.has_key('blockedby')
# Check for new messages, the data must had key 'messages' in dict. if text.has_key('messages'): if not self._messages[index]: # User has *new* messages if sysop: output(u'NOTE: You have new messages in your sysop account on %s' % self) else: output(u'NOTE: You have new messages on %s' % self) self._messages[index] = True else: self._messages[index] = False
# Don't perform other checks if the data was already loaded if self._userData[index] and not force: return
# Get username. # The data in anonymous mode had key 'anon' # if 'anon' exist, username is IP address, not to collect it right now if not text.has_key('anon'): self._isLoggedIn[index] = True self._userName[index] = text['name'] else: self._isLoggedIn[index] = False self._userName[index] = None
# Get user groups and rights if text.has_key('groups') and text['groups'] != []: self._rights[index] = text['groups'] self._rights[index].extend(text['rights']) # Warnings # Don't show warnings for not logged in users, they will just fail to # do any action if self._isLoggedIn[index]: if 'bot' not in self._rights[index] and config.notify_unflagged_bot: # Sysop + bot flag = Sysop flag in MediaWiki < 1.7.1? if sysop: output(u'Note: Your sysop account on %s does not have a bot flag. Its edits will be visible in the recent changes.' % self) else: output(u'WARNING: Your account on %s does not have a bot flag. Its edits will be visible in the recent changes and it may get blocked.' % self) if sysop and 'sysop' not in self._rights[index]: output(u'WARNING: Your sysop account on %s does not seem to have sysop rights. You may not be able to perform any sysop-restricted actions using it.' % self) else: # 'groups' is not exists, set default rights self._rights[index] = [] if self._isLoggedIn[index]: # Logged in user self._rights[index].append('user') # Assume bot, and thus autoconfirmed self._rights[index].extend(['bot', 'autoconfirmed']) if sysop: # Assume user reported as a sysop indeed has the sysop rights self._rights[index].append('sysop') # Assume the user has the default rights if API not query back self._rights[index].extend(['read', 'createaccount', 'edit', 'upload', 'createpage', 'createtalk', 'move', 'upload']) #remove Duplicate rights self._rights[index] = list(set(self._rights[index]))
# Get token if text.has_key('preferencestoken'): self._token[index] = text['preferencestoken'] if self._rights[index] is not None: # Token and rights are loaded - user data is now loaded self._userData[index] = True else: if not self._isBlocked[index]: output(u'WARNING: Token not found on %s. You will not be able to edit any page.' % self) else: #ordinary mode to get data from edit page HTMLs and JavaScripts
if '
# Not a wiki page return # Check for blocks - but only if version is 1.11 (userinfo is available) # and the user data was not yet loaded if self.versionnumber() >= 11 and (not self._userData[index] or force): blocked = self._getBlock(sysop = sysop) if blocked and not self._isBlocked[index]: # Write a warning if not shown earlier if sysop: account = 'Your sysop account' else: account = 'Your account' output(u'WARNING: %s on %s is blocked. Editing using this account will stop the run.' % (account, self)) self._isBlocked[index] = blocked
# Check for new messagesif '