postgres TOAST机制与源码解析

Posted by CCH on September 22, 2020

本文通过Postgres 的TOAST相关源代码, 解析postgres 的TOAST机制. 如果有错误, 欢迎大家指出.

什么是TOAST

TOAST 全称为 The Oversized-Attribute Storage Technique, 是PostgreSQL 用来存储大数据量的字段的机制. 在PG中, 对于任意一张普通表, 如果表中存在字段可能需要toast的,都会有一张附属的toast表. 如果某一行数据的某个字段长度非常长, 超过了某个阈值(这个阈值大约是BLCKSZ/4, 在单机PG中, 这个值是2KB, 在ADB PG中为8KB), 那么就会触发TOAST机制. TOAST机制会根据这个字段的storage属性, 如果允许压缩, 会先将字段数据进行压缩. 如果压缩后长度依然超过阈值, 那么就将这个字段的数据, 存储在附属的toast表里, 只在原来字段中存储一个”指针”, 记录着这个字段实际数据的”地址”,这个操作称为线外存储(EXTERNAL_ONDISK).

postgres=# CREATE TABLE test_toast(a text);
postgres=# SELECT reltoastrelid FROM pg_class WHERE relname='test_toast';
 reltoastrelid
---------------
         24596
(1 row)
postgres=# SELECT 24596::regclass;
        regclass
-------------------------
 pg_toast.pg_toast_24593
(1 row)
postgres=# SELECT * FROM pg_toast.pg_toast_24593
 chunk_id | chunk_seq | chunk_data
----------+-----------+------------
(0 rows)

可以看到对普通表test_toast 有一张附属的pg_toast_24593表, 这张表专门用来存放超过阈值长度的字段值. toast表有3个字段, chunk_id, chunk_seq 和 chunk_data. 当字段数据存储到toast表中时, 数据会被切分成chunk块, chunk_seq 表示的是某个chunk块在整个字段数据的第几块. chunk_id是当某个字段的数据需要toast存储时,分配到的一个OID, 唯一的标识了这个字段的数据. chunk_data存放实际的数据.

postgres=# CREATE FUNCTION f_random_str(length INTEGER) returns character varying
postgres-# LANGUAGE plpgsql
postgres-# AS $$
postgres$# DECLARE
postgres$# result varchar;
postgres$# BEGIN
postgres$# SELECT array_to_string(ARRAY(SELECT chr((65 + round(random() * 25)) :: integer)
postgres$# FROM generate_series(1,length)), '') INTO result;
postgres$# return result;
postgres$# END
postgres$# $$;
CREATE FUNCTION
postgres=# INSERT INTO test_toast VALUES(f_random_str(3000));
INSERT 0 1
postgres=# SELECT chunk_id, chunk_seq, length(chunk_data) FROM pg_toast.pg_toast_24593;
 chunk_id | chunk_seq | length
----------+-----------+--------
    24639 |         0 |   1996
    24639 |         1 |   1004
(2 rows)

在对表写入一个长度为3000字节的字符串后, 因为字段长度超过了阈值, 于是PG将该字段存储到了pg_toast_24593表中去, 且将字段数据切分成了2块, 第一块长度为1996, 第二块长度为1004. 实际上当数据存储到TOAST表之前, PG会先对数据进行压缩, 如果压缩后, 数据长度仍超过阈值, 才会将数据存到toast表中. 上面的demo由于产生的是随机字符串, PG自带的压缩算法无法很好的压缩, 实际存储到toast表中的依然总长度为3000字节.

为什么要TOAST

为什么PG里面要引入TOAST机制? 我的理解是为了节省查询时使用的内存. 比如有一个普通表T(a int, b double, c text), 其中字段c的数据都非常长, 触发了TOAST机制. 大部分对这个表的查询, 都会围绕这个表的主属性 a 来完成, 而字段c的值只会在最后返回结果的时候用到, 在查询的解析、优化、执行过程中,并不会用到字段c的具体值. 因此我们在查询过程中, 对字段c 只保存一个toast的“指针”, 查询过程用到的内存就会变小很多.

TOAST 原理

在理解TOAST的原理之前, 先要理解PG中对表字段的表示方法. TOAST机制只对变长的字段起作用, 如text. 对于固定长度的字段如int, double等, 它们本来就不会发生字段长度超过阈值的情况. 对于变长的字段, PG统一使用一个结构体 varlena 来表示.

struct varlena
{
	char		vl_len_[4];
	char		vl_dat[1];
};

总体来讲, 变长字段是由一个头部信息+字段值组成, 只是在不同的情况下, 头部信息的结构都会不一样.

  1. 00xxxxxx 当头部前两位为00时, 那么表示这个变长字段是一个未压缩的数据, 且头部的 00位之后的30位表示数据长度, 那么最长能表示2^30 byte = 1GB长度的数据
  2. 01xxxxxx 当头部前两位为01时, 那么表示这个变长字段是一个压缩的数据, 且头部的 00位之后的30位表示数据压缩后长度, 最长能表示2^30 byte = 1GB长度的压缩数据
  3. 10000000 当头部第一位为1, 之后的七为全为0时, 表示一个已经TOAST的数据, 接下来的一个字节会表示指针的类型, 而字段值则是存放指针.
  4. 1xxxxxxx 当数据头部第一位为1, 之后不全为0时, 表示一个短字符串, 接下来的7为表示数据长度, 最长表示128字节的数据.

TOAST 指针

typedef struct varatt_external
{
	int32		va_rawsize;		/* Original data size (includes header) */
	int32		va_extsize;		/* External saved size (doesn't) */
	Oid			va_valueid;		/* Unique ID of value within TOAST table */
	Oid			va_toastrelid;	/* RelID of TOAST table containing it */
}	varatt_external;

typedef struct varatt_indirect
{
	struct varlena *pointer;	/* Pointer to in-memory varlena */
}	varatt_indirect;

所谓指针, 其实指的是存储在varlena 数据区中的数据, 该数据描述了去哪里读真正的数据. 当varlena头部以10000000开头时, 那么接下来的第二个字节会是一个tag,表示数据区存放的是varatt_external结构体格式的数据,还是varatt_indirect格式的数据. 而varatt_indirect实际上是一个指向 TOAST varlena的内存指针, 因此最终也会通过读取TOAST varlena中的varatt_external来获取toast数据.

varatt_external中 va_rawsize字段表示原始数据的长度, va_extsize表示进行TOAST存储后数据的长度, 如果没有进行压缩, 那么va_extsize 会等于 va_rawsize. va_valueid表示该数据在TOAST表中的chunk_id, va_toastrelid存储TOAST表的OID. 通过va_toastrelid + va_valueid, PG就可以找到相应的toast表, 扫描得到chunk_id == va_valueid的行, 将它们的chunk_data按照chunk_seq的顺序拼接起来, 按需解压后, 即可得到原始数据.

TOAST 操作

PG代码中, TOAST相关的操作, 最终调用的函数都是toast_insert_or_update, toast_fetch_datum, toast_delete_datum, 分别负责toast数据的插入/更新, 读取, 删除. 只要分析这三个函数的代码, 就可以大致了解toast的原理.

获取TOAST数据

获取toast数据 通过 toast_fetch_datum函数完成

static struct varlena * toast_fetch_datum(struct varlena * attr);

该函数接收一个varlena数据, varlena的数据区会存放着varatt_external的 toast “指针”.

  1. 首先, 会读取指针中的va_extsize, 计算返回结果需要的内存长度,使用palloc 为其分配内存, 并设置头部信息 如果这个TOAST数据还是被压缩的, 那么需要设置结果varlena头部为10xxxxxx开头.表示数据已压缩
     ressize = toast_pointer.va_extsize;
     numchunks = ((ressize - 1) / TOAST_MAX_CHUNK_SIZE) + 1;
    
     result = (struct varlena *) palloc(ressize + VARHDRSZ);
    
     if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
         SET_VARSIZE_COMPRESSED(result, ressize + VARHDRSZ);
     else
         SET_VARSIZE(result, ressize + VARHDRSZ);
    
  2. 从toast指针中获取TOAST表OID, 打开TOAST表及其索引, 初始化扫描条件为va_valueid, 即chunk_id字段 ```C /*
    • Open the toast relation and its indexes */ toastrel = heap_open(toast_pointer.va_toastrelid, AccessShareLock); toasttupDesc = toastrel->rd_att;

    /* Look for the valid index of the toast relation */ validIndex = toast_open_indexes(toastrel, AccessShareLock, &toastidxs, &num_indexes);

    /*

    • Setup a scan key to fetch from the index by va_valueid */ ScanKeyInit(&toastkey, (AttrNumber) 1, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(toast_pointer.va_valueid)); ```
  3. 开始扫描TOAST表, 按顺序拼接chunk_data
     toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
                                            SnapshotToast, 1, &toastkey);
     while ((ttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
     {
         /*
          * Have a chunk, extract the sequence number and the data
          */
         residx = DatumGetInt32(fastgetattr(ttup, 2, toasttupDesc, &isnull));
         Assert(!isnull);
         chunk = DatumGetPointer(fastgetattr(ttup, 3, toasttupDesc, &isnull));
         Assert(!isnull);
    		
         ....
         ....
         memcpy(VARDATA(result) + residx * TOAST_MAX_CHUNK_SIZE,
                chunkdata,
                chunksize); //将chunk_data 拼接到result
     }
    

    更新和插入TOAST元组

    更新或插入toast的元组,都是通过toast_insert_or_update来完成. 这个函数同时处理INSERT和UPDATE两种情况, 参数中包括HeapTuple newtup, HeapTuple oldtup. 如果oldtup==NULL就表示这次是插入操作. 无论插入还是更新, 这个函数的重点都是对新的数据,根据其字段属性进行合适的操作.

另外需要说明的是, PG的代码中,使用 ‘p’, ‘x’, ‘e’, ‘m’ 来表示字段的存储策略(storage), 其中’p’ 表示不进行压缩和线外存储, ‘x’ 表示只进行线外存储且不允许压缩, ‘e’表示允许进行线外存储和压缩, ‘m’ 表示只进行压缩,不允许线外存储.

toast_insert_or_update 对新的tuple的处理流程如下

  1. 首先从HeapTuple结构的数据中, 提取出Datum 数组和 isnull bool型数组
heap_deform_tuple(newtup, tupleDesc, toast_values, toast_isnull);

2. 遍历新tuple的每一列.

  • 若为元组更新操作, 且该列是线外存储, 且新旧元组中的toast指针值不一样, 就需要标记该列数据为需要删除
  • 对于定长类型的字段, 直接设置toast策略为’p’. 对不定长类型的字段, 若字段定义的storage策略为’p’, 则设置toast策略为’p’.
//新旧元组中的toast指针值不一样, 就需要标记该列数据为需要删除
if (att[i]->attlen == -1 && !toast_oldisnull[i] &&
				VARATT_IS_EXTERNAL_ONDISK(old_value))
			{
				if (toast_isnull[i] || !VARATT_IS_EXTERNAL_ONDISK(new_value) ||
					memcmp((char *) old_value, (char *) new_value,
						   VARSIZE_EXTERNAL(old_value)) != 0)
				{
					/*
					 * The old external stored value isn't needed any more
					 * after the update
					 */
					toast_delold[i] = true;
					need_delold = true;
				}
			}
....

//不定长类型的字段, 若字段定义的storage策略为'p', 则设置toast策略为'p'
if (att[i]->attlen == -1)
{
			/*
			 * If the table's attribute says PLAIN always, force it so.
			 */
			if (att[i]->attstorage == 'p')
				toast_action[i] = 'p';
}
  1. 第三步, 分4个循环来处理

第一个循环, 找出目前既可以被压缩,也可以被线外存储的数据长度最大的字段, 首先对其进行压缩, 如果压缩完之后仍超过TOAST阈值(2KB), 那么再将压缩后的数据线外存储

while (heap_compute_data_size(tupleDesc,
								  toast_values, toast_isnull) > maxDataLen)
{
		/*
		 * 找到当前可以被压缩,最大的字段
		 */
		for (i = 0; i < numAttrs; i++)
		{
			if (toast_action[i] != ' ')
				continue;
			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
				continue;		/* can't happen, toast_action would be 'p' */
			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
				continue;
			if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
				biggest_size = toast_sizes[i];
			}
		}

		//调用toast_compress_datum进行压缩
		i = biggest_attno;
		if (att[i]->attstorage == 'x')
		{
			old_value = toast_values[i];
			new_value = toast_compress_datum(old_value);

			if (DatumGetPointer(new_value) != NULL)
			{
				/* successful compression */
				if (toast_free[i])
					pfree(DatumGetPointer(old_value));
				toast_values[i] = new_value;
				toast_free[i] = true;
				toast_sizes[i] = VARSIZE(DatumGetPointer(toast_values[i]));
				need_change = true;
				need_free = true;
			}
		}

		....
		//如果压缩完之后, 依然超过maxDataLen, 则进行线外存储
		if (toast_sizes[i] > maxDataLen &&
			rel->rd_rel->reltoastrelid != InvalidOid)
		{
			old_value = toast_values[i];
			toast_action[i] = 'p';
			toast_values[i] = toast_save_datum(rel, toast_values[i],
											   toast_oldexternal[i], options);
			if (toast_free[i])
				pfree(DatumGetPointer(old_value));
			toast_free[i] = true;
			need_change = true;
			need_free = true;
		}

}

第二个循环, 找到可以进行线外存储的字段(‘x’ 或 ‘e’) 进行线外存储.

while (heap_compute_data_size(tupleDesc,
								  toast_values, toast_isnull) > maxDataLen &&
		   rel->rd_rel->reltoastrelid != InvalidOid)
	{
		//找到数据长度最大的字段
		for (i = 0; i < numAttrs; i++)
		{
			if (toast_action[i] == 'p')
				continue;
			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
				continue;		/* can't happen, toast_action would be 'p' */
			if (att[i]->attstorage != 'x' && att[i]->attstorage != 'e')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
				biggest_size = toast_sizes[i];
			}
		}

		...
		//直接进行线外存储
		toast_action[i] = 'p';
		toast_values[i] = toast_save_datum(rel, toast_values[i],
										   toast_oldexternal[i], options);
	}

循环三, 将策略为‘m’的字段进行压缩

while (heap_compute_data_size(tupleDesc,
								  toast_values, toast_isnull) > maxDataLen)
	{
		int			biggest_attno = -1;
		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
		Datum		old_value;
		Datum		new_value;

		//找到策略为'm'的 数据长度最大的字段
		for (i = 0; i < numAttrs; i++)
		{
			if (toast_action[i] != ' ')
				continue;
			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
				continue;		/* can't happen, toast_action would be 'p' */
			if (VARATT_IS_COMPRESSED(DatumGetPointer(toast_values[i])))
				continue;
			if (att[i]->attstorage != 'm')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
				biggest_size = toast_sizes[i];
			}
		}

		if (biggest_attno < 0)
			break;

		/*
		 * 调用toast_compress_datum进行压缩
		 */
		i = biggest_attno;
		old_value = toast_values[i];
		new_value = toast_compress_datum(old_value);
	}

循环四, 如果前3次循环之后, 元组的长度仍超过阈值, 那么就找到策略为’m’(只允许压缩,不允许线外存储)的字段,对它进行线外存储. 这一层循环过后, 可以保证元组的长度会小于阈值, 因为最差的情况就是所有变长字段变成线外存储, 元组中只存了toast指针和定长字段.

	while (heap_compute_data_size(tupleDesc,
								  toast_values, toast_isnull) > maxDataLen &&
		   rel->rd_rel->reltoastrelid != InvalidOid)
	{
		int			biggest_attno = -1;
		int32		biggest_size = MAXALIGN(TOAST_POINTER_SIZE);
		Datum		old_value;

		/*--------
		 * Search for the biggest yet inlined attribute with
		 * attstorage = 'm'
		 *--------
		 */
		for (i = 0; i < numAttrs; i++)
		{
			if (toast_action[i] == 'p')
				continue;
			if (VARATT_IS_EXTERNAL(DatumGetPointer(toast_values[i])))
				continue;		/* can't happen, toast_action would be 'p' */
			if (att[i]->attstorage != 'm')
				continue;
			if (toast_sizes[i] > biggest_size)
			{
				biggest_attno = i;
				biggest_size = toast_sizes[i];
			}
		}

		if (biggest_attno < 0)
			break;

		/*
		 * 线外存储
		 */
		i = biggest_attno;
		old_value = toast_values[i];
		toast_action[i] = 'p';
		toast_values[i] = toast_save_datum(rel, toast_values[i],
										   toast_oldexternal[i], options);
		if (toast_free[i])
			pfree(DatumGetPointer(old_value));
		toast_free[i] = true;

		need_change = true;
		need_free = true;
	}

toast_save_datum

toast_insert_or_update 实际调用的是toast_save_datum()来将某个字段数据存储到toast辅助表中去.

toast_save_datum 首先获取一个OID来作为该toast数据的唯一标志, 然后将数据分成若干个chunk, 调用heap_isnert插入到toast辅助表中去

获取一个OID来作为该toast数据的唯一标志 chunk_id
	if (!OidIsValid(rel->rd_toastoid))
	{
		/* normal case: just choose an unused OID */
		toast_pointer.va_valueid =
			GetNewOidWithIndex(toastrel,
							   RelationGetRelid(toastidxs[validIndex]),
							   (AttrNumber) 1);
	}
	/*
	 * 将数据切分为chunk, 每个chunk数据和 chunk_id, chunk_seq组装为一个tuple, 调用
	 * heap_insert写入toast辅助表
	 */
	while (data_todo > 0)
	{
		int			i;

		/*
		 * Calculate the size of this chunk
		 */
		chunk_size = Min(TOAST_MAX_CHUNK_SIZE, data_todo);

		/*
		 * Build a tuple and store it
		 */
		t_values[1] = Int32GetDatum(chunk_seq++);
		SET_VARSIZE(&chunk_data, chunk_size + VARHDRSZ);
		memcpy(VARDATA(&chunk_data), data_p, chunk_size);
		toasttup = heap_form_tuple(toasttupDesc, t_values, t_isnull);

		heap_insert(toastrel, toasttup, mycid, options, NULL);
	}

删除TOAST数据

当包含TOAST数据的元组被删除时,该元组中被TOAST的数据在其TOAST表中也需要被删除。删除TOAST数据, 会调用toast_delete函数. 该函数接收一个Relation和需要删除的元组HeapTuple. 它会遍历元祖中的每个字段, 对于线外存储的字段(VARATT_IS_EXTERNAL_ONDISK(Datum) == true), 会调用toast_delete_datum, 来到toast表中删除这个字段的数据.

void
toast_delete(Relation rel, HeapTuple oldtup)
{
	....
	for (i = 0; i < numAttrs; i++)
	{
		if (att[i]->attlen == -1)
		{
			Datum		value = toast_values[i];

			if (toast_isnull[i])
				continue;
			else if (VARATT_IS_EXTERNAL_ONDISK(PointerGetDatum(value))) //对线外存储的字段调用toast_delete_datum
				toast_delete_datum(rel, value);
		}
	}
}

toast_delete_datum函数的过程与toast_fetch_datum类似, 打开表所关联的toast表, 找到需要的toast数据, 使用simple_heap_delete来标记删除

static void
toast_delete_datum(Relation rel, Datum value)
{
	/*
	 * 打开rel表所关联的tost表
	 */
	toastrel = heap_open(toast_pointer.va_toastrelid, RowExclusiveLock);

	/* 开始对toast表进行遍历, 找到chunk_id 相等的行 */
	toastscan = systable_beginscan_ordered(toastrel, toastidxs[validIndex],
										   SnapshotToast, 1, &toastkey);
	while ((toasttup = systable_getnext_ordered(toastscan, ForwardScanDirection)) != NULL)
	{
		/*
		 * Have a chunk, delete it
		 */
		simple_heap_delete(toastrel, &toasttup->t_self);
	}

	/*
	 * End scan and close relations
	 */
	systable_endscan_ordered(toastscan);
	toast_close_indexes(toastidxs, num_indexes, RowExclusiveLock);
	heap_close(toastrel, RowExclusiveLock);
}

对于每一个chunk_id 与目标chunk_id相等的行,使用simple_heap_delete进行删除. PG中使用simple_heap_delete对数据进行标记删除, 标记元组的xmax为删除该元组的xid, 并写相关的XLOG日志即可, 而不是将原始数据直接删除.