AstroGrid

Navigation
Log in


Forgot your password?
 

This website (www2.astrogrid.org) is now deprecated - please go to www.astrogrid.org for up to date information.

Document Actions

VOTable Python Module

Click here to get the file

Size 10.1 kB - File type text/python-source

File contents

"""	Class to handle VOTable
	Created: 2005-05-31 by Shui Hung Kwok, shkwok at computer.org
	Edited: 2007-02-12 by Noel Winstanley, noel.winstanley at manchester.ac.uk
		- added ability to parse strings as votable-xml
	See http://www.ivoa.net/Documents/latest/VOT.html .
	
"""

import sys
from types import *
import xml.sax
import xml.sax.handler 
import StringIO

class VONode (object):
	"""	Class representing an XML node of a VOTable
	""" 
	def __init__ (self, tagname=('','')):
		self._tagname = tagname
		self._nodeList = []
		self._attributes = {}

	def addNode (self, node):
		self._nodeList.append (node)
		if not isinstance (node, (StringType, UnicodeType)):
			name = node.getNamePart ()
			try:
				val = self.__dict__[name]
				if isinstance (val, ListType):
					val.append (node)
				else:
					self.__dict__[name] = [val, node]
			except:
				self.__dict__[name] = node
		else:
			self.content = node

	def addAttribute (self, attr):
		name,value = attr
		self._attributes[name] = value
		self.__dict__[name] = value

	def addAttributes (self, attrs):
		self._attributes.update (attrs)
		for k,v in attrs.items ():
			self.__dict__[k[1]] = v

	def __getItem__ (self, idx):
		return self._nodeList[idx].getContent ()

	def getAttribute (self, name):
		""" Returns attribute by name or '' if not found """
		return self._attributes.get (name)

	def getAttributes (self):
		""" Returns all attributes.
		"""
		res = {}
		for (ns,n),at in self._attributes.items():
			res[n] = at
		return res

	def getNodeList (self):
		""" Returns a list of nodes that are of type VONode
		"""
		res = []
		for node in self._nodeList:
			try:
				l = node._nodeList
				res.append (node)
			except Exception, e:
				pass
		return res		

	def getContent (self):
		""" Returns all strings of the node.
		"""
		res = []
		for node in self._nodeList:
			try:
				l = node.lower ()
				res.append (node)
			except Exception, e:
				pass
		return ''.join (res)
	
	def getNamePart (self):
		try:
			ns, n = self._tagname
			return n
		except:
			return n
			
	def getNodesByName (self, look4):
		""" Returns a list of nodes whose tagname = look4
		"""
		res = []
		for node in self._nodeList:
			try:
				if look4 != node.getNamePart ():
					continue
				l = node._nodeList
				res.append (node)
			except Exception, e:
				pass
		return res		

	def __str__ (self):
		try:
			return self.content
		except:
			return self.buildName (self._tagname)

	def getNode (self, path):
		"""	Returns a node for a given path.
			Path is of the form /tag1/tag2/tag3.
			Path can include array index, like /tag1/tag2[3]/tag4.
		"""
		node = self
		children = []
		pathArray = path.split ("/")

		rootName = self.getNamePart ()
		if rootName != pathArray[1]:
			return None

		pathArray = pathArray[2:]
		for elem in pathArray:
			tmp = elem.replace ('[', ']')
			list = tmp.split (']')
			name = list[0]
			if len (list) > 1:
				idx = int (list[1])
			else:
				idx = 0
			children = node.getNodesByName (name)
			nr = len (children)
			if idx >= nr:
				return None
			node = children [idx]
		return node	

	def getNodesByPath (self, path):
		"""	Returns an array of VONodes for a given path.
			Path is of the form /tag1/tag2/tag3.
			Path can include array index, like /tag1/tag2[3]/tag4.
		"""
		node = self
		children = []
		pathArray = path.split ("/")

		rootName = self.getNamePart ()
		if rootName != pathArray[1]:
			return None

		pathArray = pathArray[2:]
		for elem in pathArray:
			tmp = elem.replace ('[', ']')
			list = tmp.split (']')
			name = list[0]
			if len (list) > 1:
				idx = int (list[1])
			else:
				idx = 0
			children = node.getNodesByName (name)
			nr = len (children)
			if idx >= nr:
				return None
			node = children [idx]
		return children	

	def buildName (self, tname):
		""" Returns a name with namespace as prefix
			or just name if no namespace
			Note that the prefix is the real namespace
			and not the abbreviation used in the original XML
		"""
		ns,n = tname
		"""
		if ns:
			return "%s:%s" % (self.qname, n)
		else: 
			return n
		"""
		return n

	def printAllNodes (self, func=sys.stdout.write, prefix=''):
		"""	Recursive method to visit all nodes of the tree
			and calls the provided function to output the content.
		"""
		func ("%s<%s" % (prefix, self.buildName (self._tagname)))
		for ns,v in self._attributes.items ():
			func (" %s='%s'" % (self.buildName ((ns)), v))
		func (">")
		
		last = 0
		for n in self._nodeList:
			if isinstance (n, (StringType, UnicodeType)):
				if last == 2:
					func ("\n%s" % prefix)
				func ("%s" % n)
				last = 1
			else:
				if last <= 1:
					func ("\n")
				n.printAllNodes (func, prefix + '   ')
				last = 2
		if last <= 1:
			func ("</%s>\n" % self.buildName (self._tagname))
		else:
			func ("%s</%s>\n" % (prefix, self.buildName (self._tagname)))
		
class VOXMLHandler (xml.sax.handler.ContentHandler):
	""" Class implementing callbacks for the SAX parser.
	"""
	def __init__ (self, vonode=VONode):
		# Create a parser
		xml.sax.handler.ContentHandler.__init__ (self)
		self.parser = xml.sax.make_parser()
		self.parser.setFeature (xml.sax.handler.feature_namespaces, 1)
		self.parser.setContentHandler (self)
		self.vonode = vonode
		self.sentinel = vonode ()
		self.currNode = self.sentinel
		self.stack = []

	def startElementNS (self, (urn, name), qname, attrs):
		#print "start ", name
		self.stack.append (self.currNode)

		self.currNode = self.vonode ((urn, name))
		self.currNode.addAttributes (attrs)

	def characters (self, chunk):
		buf = chunk.strip ()
		if len (buf) == 0: return
		self.currNode.addNode (buf)

	def endElementNS (self, (urn, name), qname):
		#print "end ", name
		newNode = self.currNode
		self.currNode = self.stack.pop ()
		self.currNode.addNode (newNode)

	def parse (self, source):
		""" Main entry point.
			Source can be URL or file name.
		"""
		self.parser.parse (source)
		return self.sentinel 
	#NWW
	def parseText(self,t):
		""" Alternate entry point. Text is some xml to be parsed"""
		inputSource = xml.sax.xmlreader.InputSource()
		inputSource.setByteStream(StringIO.StringIO(t))
		self.parser.parse(inputSource)
		return self.sentinel
	
class VOXML (object):
	"""	Class to implement XML parser
	"""
	def __init__ (self, source=None, vonode=VONode):
		""" Instantiate a VOXML
			source can be URL, file name or a string representing the VOXML.
			vonode is a class representing VONode, must be derived from or
			compatible with VONode.
		"""
		self.vonode = vonode 
		self.root = None
		if source != None:
			self.parse (source)

	def parse (self, source):
		""" Invokes XML parser and stores VOXML
			in self.root as VONode.
		"""
		parser = VOXMLHandler (self.vonode)
		self.root = parser.parse (source)
	#NWW
	def parseText(self,text):
		""" Invokes XML parser on inlined xml text"""
		parser = VOXMLHandler(self.vonode)
		self.root = parser.parseText(text)

	def printAllNodes (self, func=sys.stdout.write):	
		"""	Output entire content as XML.
			func is the output method, defined as:
				func (outString)
		"""
		node = self.root._nodeList[0] 
		node.printAllNodes (func)

	def getNode (self, path):
		""" Returns a VONode of the given path.
		"""
		return self.root._nodeList[0].getNode (path)

	def getContent (self, path):
		""" Returns the content of a node.
			Only strings are returned.
		"""
		node = self.getNode (path)
		return node.getContent ()

class VOTable (VOXML):
	""" Implementation of VOTable 
	"""

	def __init__ (self, source=None, vonode=VONode):
		""" Contruct an instance of VOTable
		"""
		VOXML.__init__ (self, source, vonode)

	def getColumnIdx (self, val):
		""" Returns the column index for the given name
			Will return any attribute value matching val.
		"""
		fields = self.getFields ()
		for coln, f in enumerate (fields):
			if val in f._attributes.values ():
				return coln
		return -1
		
	def getFields (self):
		""" Returns a list of VONode representing all the fields
		"""
		#table = self.root.VOTABLE.RESOURCE.TABLE
		#return table.getNodesByName ('FIELD')
		return self.root.VOTABLE.RESOURCE.TABLE.FIELD
	
	def getParams (self):
		"""	Returns a list of VONode representing all PARAMS
		"""
		return self.root.VOTABLE.RESOURCE.RESOURCE.PARAM

	def getFieldsAttrs (self):
		"""	Returns a list of maps that contains attributes.
			Returned list looks like this: [{},{},...]
		"""
		res = []
		fields = self.getFields ()
		for elem in fields:
			try:
				res.append (elem.getAttributes())
			except:
				pass
		return res
	
	def getDataRows (self):
		"""	Returns a list of VONodes representing rows of the table.
			Use getData () to extract data from each row.
			for x in getDataRows ():
				data = getData (x)
				#data = [values ...]
		"""
		tableData = self.root.VOTABLE.RESOURCE.TABLE.DATA.TABLEDATA
		return tableData._nodeList

	def getData (self, row):
		""" row is a VONode <TR> parent of a list of <TD>.
			Returns a list of values.
		"""
		res = []
		list = row._nodeList
		for elm in list:
			try:
				res.append (elm.getContent ())
			except:
				res.append ('')
		return res	
	
	def append (self, vot):
		""" Appends votable vot to the end of this VOTable. 
			No tests to see if fields are the same. 
			vot must have the same fields. 
		"""
		try:
			node1 = self.root.VOTABLE.RESOURCE.TABLE.DATA.TABLEDATA
		except:
			node1 = None
		try:
			node2 = vot.root.VOTABLE.RESOURCE.TABLE.DATA.TABLEDATA
		except:
			node2 = None

		if node1:
			if node2:
				node1._nodeList.extend (node2._nodeList)
		else:
			if node2:
				self.root.VOTABLE.RESOURCE.TABLE.DATA.TABLEDATA = node2
		
	def output2CSV (self, write=sys.stdout.write):
		""" Outputs the content in CSV format
			with fields names as column heads.
		"""
		fields = self.getFieldsAttrs ()
		if not fields: return
		headers = []
		for colnr,fd in enumerate (fields):
			hdr = "col%d" % colnr
			for i in ['name', 'ID', 'id']:
				try:
					hdr = fd[i]
					break
				except:
					pass
			headers.append (hdr)
	
		write (','.join (headers))
	
		rows = self.getDataRows ()
		getData = self.getData # get a shortcut for the method
		for row in rows:
			write (','.join (getData (row)))

if __name__ == '__main__':
	votable = VOTable()
	votable.parse (sys.argv[1])
	votable.printAllNodes ()
	#print [x.getAttribute ('ID') for x in votable.getFields () ]
	#print votable.root.VOTABLE.RESOURCE.TABLE.DATA.TABLEDATA.TR[1].TD[1]
	#print votable.getFields ()