用Python分别写一个udf、udaf、udtf函数
时间: 2023-05-30 18:01:19 浏览: 351
udf.zip_UDF案例_udf_udf模板
5星 · 资源好评率100%
1. UDF(用户自定义函数)示例:
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
def square(x):
return x * x
square_udf = udf(square, IntegerType())
df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["num"])
df.withColumn("square", square_udf("num")).show()
```
输出结果:
```
+---+------+
|num|square|
+---+------+
| 1| 1|
| 2| 4|
| 3| 9|
| 4| 16|
| 5| 25|
+---+------+
```
2. UDAF(用户自定义聚合函数)示例:
```python
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StructType, StructField
def my_average(values):
return sum(values) / len(values)
schema = StructType([StructField("value", IntegerType())])
my_average_udaf = udf(my_average, IntegerType())
df = spark.createDataFrame([(1,), (2,), (3,), (4,), (5,)], ["num"])
df.agg(my_average_udaf(df.num)).show()
```
输出结果:
```
+-----------+
|my_average(num)|
+-----------+
| 3|
+-----------+
```
3. UDTF(用户自定义表生成函数)示例:
```python
from pyspark.sql.functions import explode
from pyspark.sql.types import ArrayType, StringType
def split_words(text):
return text.split()
split_words_udf = udf(split_words, ArrayType(StringType()))
df = spark.createDataFrame([("hello world",), ("spark is awesome",)], ["text"])
df.select(explode(split_words_udf(df.text))).show()
```
输出结果:
```
+------+
| col|
+------+
| hello|
| world|
| spark|
| is|
|awesome|
+------+
```
阅读全文