User-defined Table Functions (UDTF)

Amandeep Singh Johar
2 min readMay 22, 2024

Spark 3.5 introduces the Python user-defined table function (UDTF), a novel type of user-defined function. Unlike scalar functions, which produce a single result for each call, UDTFs are invoked within the FROM clause of a query and output an entire table. A UDTF call can accept any number of arguments, which can be either scalar expressions or table arguments representing complete input tables.

Code sample for UDTF

Why Use Python UDTFs

If you need a function that generates multiple rows and columns while taking advantage of the extensive Python ecosystem, Python UDTFs are ideal.

Python UDTFs vs. Python UDFs

Python UDFs in Spark are designed to accept zero or more scalar values as input and return a single value. In contrast, UDTFs provide greater flexibility by allowing the return of multiple rows and columns, thus extending the functionality of UDFs.

Python UDTFs vs. SQL UDTFs

While SQL UDTFs are efficient and versatile, Python offers a broader array of libraries and tools. For transformations or computations that require advanced techniques, such as statistical functions or machine learning inferences, Python UDTFs are particularly advantageous.

UDTF with LangChain

The previous example might seem basic, so let’s explore a more engaging scenario by integrating Python UDTFs with LangChain.

from langchain.chains import LLMChain
from langchain.llms import OpenAI
from langchain.prompts import PromptTemplate
from pyspark.sql.functions import lit, udtf

@udtf(returnType="keyword: string")
class KeywordsGenerator:
"""
Generate a list of comma separated keywords about a topic using an LLM.
Output only the keywords.
"""
def __init__(self):
llm = OpenAI(model_name="gpt-4", openai_api_key=<your-key>)
prompt = PromptTemplate(
input_variables=["topic"],
template="generate a couple of comma separated keywords about {topic}. Output only the keywords."
)
self.chain = LLMChain(llm=llm, prompt=prompt)

def eval(self, topic: str):
response = self.chain.run(topic)
keywords = [keyword.strip() for keyword in response.split(",")]
for keyword in keywords:
yield (keyword, )

Details:-

Happy learning 🙂 !!!!!!

--

--

Amandeep Singh Johar
Amandeep Singh Johar

Written by Amandeep Singh Johar

Greetings! I'm Amandeep Singh Johar, an accomplished Big Data Engineer with a proven track record in the engineering industry.

No responses yet