pandas.merge()方法源码阅读笔记
pandas.merge()方法
pandas/core/reshape/merge.py#L45-L54
@Substitution('\nleft : DataFrame')
@Appender(_merge_doc, indents=0)
def merge(left, right, how='inner', on=None, left_on=None, right_on=None,
left_index=False, right_index=False, sort=False,
suffixes=('_x', '_y'), copy=True, indicator=False):
op = _MergeOperation(left, right, how=how, on=on, left_on=left_on,
right_on=right_on, left_index=left_index,
right_index=right_index, sort=sort, suffixes=suffixes,
copy=copy, indicator=indicator)
return op.get_result()
由此可见,pandas.merge() 方法的主体是建立 _MergeOperation 类对象并获取该操作的执行结果(通过 get_result())方法。因此 merge 操作的核心应该在 _MergeOperation 类中。
@Substitution 和 @Appender 两个装饰器来自 pandas.util._decorators 包,用于处理 docstring。由于不是源码阅读的重点,因此这里不做过多解释。
吐槽:光是文档注释就要用到装饰器的语言特性进行处理,是该称赞Python的动态特性呢,还是该批评其对于性能的影响呢。
_MergeOperation类
整个类的定义在 pandas/core/reshape/merge.py#L491-L953 中。
构造函数(__init__ 方法)
pandas/core/reshape/merge.py#L491-L562
class _MergeOperation(object):
"""
Perform a database (SQL) merge operation between two DataFrame objects
using either columns as keys or their row indexes
"""
_merge_type = 'merge'
def __init__(self, left, right, how='inner', on=None,
left_on=None, right_on=None, axis=1,
left_index=False, right_index=False, sort=True,
suffixes=('_x', '_y'), copy=True, indicator=False):
self.left = self.orig_left = left
self.right = self.orig_right = right
self.how = how
self.axis = axis
self.on = com._maybe_make_list(on)
self.left_on = com._maybe_make_list(left_on)
self.right_on = com._maybe_make_list(right_on)
self.copy = copy
self.suffixes = suffixes
self.sort = sort
self.left_index = left_index
self.right_index = right_index
self.indicator = indicator
if isinstance(self.indicator, compat.string_types):
self.indicator_name = self.indicator
elif isinstance(self.indicator, bool):
self.indicator_name = '_merge' if self.indicator else None
else:
raise ValueError(
'indicator option can only accept boolean or string arguments')
if not isinstance(left, DataFrame):
raise ValueError(
'can not merge DataFrame with instance of '
'type {0}'.format(type(left)))
if not isinstance(right, DataFrame):
raise ValueError(
'can not merge DataFrame with instance of '
'type {0}'.format(type(right)))
if not is_bool(left_index):
raise ValueError(
'left_index parameter must be of type bool, not '
'{0}'.format(type(left_index)))
if not is_bool(right_index):
raise ValueError(
'right_index parameter must be of type bool, not '
'{0}'.format(type(right_index)))
# warn user when merging between different levels
if left.columns.nlevels != right.columns.nlevels:
msg = ('merging between different levels can give an unintended '
'result ({0} levels on the left, {1} on the right)')
msg = msg.format(left.columns.nlevels, right.columns.nlevels)
warnings.warn(msg, UserWarning)
self._validate_specification()
# note this function has side effects
(self.left_join_keys,
self.right_join_keys,
self.join_names) = self._get_merge_keys()
# validate the merge keys dtypes. We may need to coerce
# to avoid incompat dtypes
self._maybe_coerce_merge_keys()
__init__() 方法前面很大一部分都在做对象的数据成员的赋值,其中对于不少参数还要进行类型判断以进行特殊处理或者报错。其中还用到了 com._maybe_make_list() 方法处理 on、left_on、right_on这三个参数的值。com 是 pandas.core.common 包的别名,_maybe_make_list() 方法其实是将一个非 list/tuple 的单一元素转换为包含该单个元素的 list。一大堆赋值和处理、报错过后,终于执行到了第一个类内方法:_validate_specification()。
校验 merge 参数:_validate_specification() 方法
pandas/core/reshape/merge.py#L911-L953
class _MergeOperation(object):
# ...
def _validate_specification(self):
# Hm, any way to make this logic less complicated??
if self.on is None and self.left_on is None and self.right_on is None:
if self.left_index and self.right_index:
self.left_on, self.right_on = (), ()
elif self.left_index:
if self.right_on is None:
raise MergeError('Must pass right_on or right_index=True')
elif self.right_index:
if self.left_on is None:
raise MergeError('Must pass left_on or left_index=True')
else:
# use the common columns
common_cols = self.left.columns.intersection(
self.right.columns)
if len(common_cols) == 0:
raise MergeError('No common columns to perform merge on')
if not common_cols.is_unique:
raise MergeError("Data columns not unique: %s"
% repr(common_cols))
self.left_on = self.right_on = common_cols
elif self.on is not None:
if self.left_on is not None or self.right_on is not None:
raise MergeError('Can only pass argument "on" OR "left_on" '
'and "right_on", not a combination of both.')
self.left_on = self.right_on = self.on
elif self.left_on is not None:
n = len(self.left_on)
if self.right_index:
if len(self.left_on) != self.right.index.nlevels:
raise ValueError('len(left_on) must equal the number '
'of levels in the index of "right"')
self.right_on = [None] * n
elif self.right_on is not None:
n = len(self.right_on)
if self.left_index:
if len(self.right_on) != self.left.index.nlevels:
raise ValueError('len(right_on) must equal the number '
'of levels in the index of "left"')
self.left_on = [None] * n
if len(self.right_on) != len(self.left_on):
raise ValueError("len(right_on) must equal len(left_on)")
上来先是一句吐槽。整个校验还是集中在对于 merge 的匹配参数on、left_on、right_on的校验上。
- 首先是
on、left_on、right_on都没有指定的情况:- 全部或者部分指定了
left_index或者right_index参数?反正你要显式指定用于匹配的列或者用索引进行匹配; - 都没有指定?使用左右两个 DataFrame 中的公共列。
- 全部或者部分指定了
- 指定了
on参数:- 还指定了
left_on或者right_on参数?不行,报错。
- 还指定了
- 指定匹配一边的列和另一边的索引:
- 列的数量和索引的级别数量对么? 最后再次检验两边匹配的列的数量。
整个逻辑还好吧,也没有很复杂啊,不如说有点琐碎。
_get_merge_keys() 方法
pandas/core/reshape/merge.py#L755-L864
class _MergeOperation(object):
# ...
def _get_merge_keys(self):
"""
Note: has side effects (copy/delete key columns)
Parameters
----------
left
right
on
Returns
-------
left_keys, right_keys
"""
left_keys = []
right_keys = []
join_names = []
right_drop = []
left_drop = []
left, right = self.left, self.right
is_lkey = lambda x: isinstance(
x, (np.ndarray, Series)) and len(x) == len(left)
is_rkey = lambda x: isinstance(
x, (np.ndarray, Series)) and len(x) == len(right)
# Note that pd.merge_asof() has separate 'on' and 'by' parameters. A
# user could, for example, request 'left_index' and 'left_by'. In a
# regular pd.merge(), users cannot specify both 'left_index' and
# 'left_on'. (Instead, users have a MultiIndex). That means the
# self.left_on in this function is always empty in a pd.merge(), but
# a pd.merge_asof(left_index=True, left_by=...) will result in a
# self.left_on array with a None in the middle of it. This requires
# a work-around as designated in the code below.
# See _validate_specification() for where this happens.
# ugh, spaghetti re #733
if _any(self.left_on) and _any(self.right_on):
for lk, rk in zip(self.left_on, self.right_on):
if is_lkey(lk):
left_keys.append(lk)
if is_rkey(rk):
right_keys.append(rk)
join_names.append(None) # what to do?
else:
if rk is not None:
right_keys.append(right[rk]._values)
join_names.append(rk)
else:
# work-around for merge_asof(right_index=True)
right_keys.append(right.index)
join_names.append(right.index.name)
else:
if not is_rkey(rk):
if rk is not None:
right_keys.append(right[rk]._values)
else:
# work-around for merge_asof(right_index=True)
right_keys.append(right.index)
if lk is not None and lk == rk:
# avoid key upcast in corner case (length-0)
if len(left) > 0:
right_drop.append(rk)
else:
left_drop.append(lk)
else:
right_keys.append(rk)
if lk is not None:
left_keys.append(left[lk]._values)
join_names.append(lk)
else:
# work-around for merge_asof(left_index=True)
left_keys.append(left.index)
join_names.append(left.index.name)
elif _any(self.left_on):
for k in self.left_on:
if is_lkey(k):
left_keys.append(k)
join_names.append(None)
else:
left_keys.append(left[k]._values)
join_names.append(k)
if isinstance(self.right.index, MultiIndex):
right_keys = [lev._values.take(lab)
for lev, lab in zip(self.right.index.levels,
self.right.index.labels)]
else:
right_keys = [self.right.index.values]
elif _any(self.right_on):
for k in self.right_on:
if is_rkey(k):
right_keys.append(k)
join_names.append(None)
else:
right_keys.append(right[k]._values)
join_names.append(k)
if isinstance(self.left.index, MultiIndex):
left_keys = [lev._values.take(lab)
for lev, lab in zip(self.left.index.levels,
self.left.index.labels)]
else:
left_keys = [self.left.index.values]
if left_drop:
self.left = self.left.drop(left_drop, axis=1)
if right_drop:
self.right = self.right.drop(right_drop, axis=1)
return left_keys, right_keys, join_names
_maybe_coerce_merge_keys()方法
pandas/core/reshape/merge.py#L866-L909
def _maybe_coerce_merge_keys(self):
# we have valid mergee's but we may have to further
# coerce these if they are originally incompatible types
#
# for example if these are categorical, but are not dtype_equal
# or if we have object and integer dtypes
for lk, rk, name in zip(self.left_join_keys,
self.right_join_keys,
self.join_names):
if (len(lk) and not len(rk)) or (not len(lk) and len(rk)):
continue
# if either left or right is a categorical
# then the must match exactly in categories & ordered
if is_categorical_dtype(lk) and is_categorical_dtype(rk):
if lk.is_dtype_equal(rk):
continue
elif is_categorical_dtype(lk) or is_categorical_dtype(rk):
pass
elif is_dtype_equal(lk.dtype, rk.dtype):
continue
# if we are numeric, then allow differing
# kinds to proceed, eg. int64 and int8
# further if we are object, but we infer to
# the same, then proceed
if (is_numeric_dtype(lk) and is_numeric_dtype(rk)):
if lk.dtype.kind == rk.dtype.kind:
continue
# let's infer and see if we are ok
if lib.infer_dtype(lk) == lib.infer_dtype(rk):
continue
# Houston, we have a problem!
# let's coerce to object
if name in self.left.columns:
self.left = self.left.assign(
**{name: self.left[name].astype(object)})
if name in self.right.columns:
self.right = self.right.assign(
**{name: self.right[name].astype(object)})
获取 merge 结果(get_result() 方法)
class _MergeOperation(object):
# ...
def get_result(self):
if self.indicator:
self.left, self.right = self._indicator_pre_merge(
self.left, self.right)
join_index, left_indexer, right_indexer = self._get_join_info()
ldata, rdata = self.left._data, self.right._data
lsuf, rsuf = self.suffixes
llabels, rlabels = items_overlap_with_suffix(ldata.items, lsuf,
rdata.items, rsuf)
lindexers = {1: left_indexer} if left_indexer is not None else {}
rindexers = {1: right_indexer} if right_indexer is not None else {}
result_data = concatenate_block_managers(
[(ldata, lindexers), (rdata, rindexers)],
axes=[llabels.append(rlabels), join_index],
concat_axis=0, copy=self.copy)
typ = self.left._constructor
result = typ(result_data).__finalize__(self, method=self._merge_type)
if self.indicator:
result = self._indicator_post_merge(result)
self._maybe_add_join_keys(result, left_indexer, right_indexer)
return result