失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > 消费者人群画像 python_如何正确打开相似人群画像算法

消费者人群画像 python_如何正确打开相似人群画像算法

时间:2021-11-27 21:52:35

相关推荐

消费者人群画像 python_如何正确打开相似人群画像算法

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~

一、数据源

1、相似人群数据存在TDW库中,数据字典说明:

CREATE TABLE sim_people_tdw_tbl(

uid STRING COMMENT 'reader id',

sim_uids STRING COMMENT 'sim_uids',

sim_num BIGINT COMMENT 'sim_num',

update_date STRING COMMENT 'update_date'

)

字段类型含义uidstring用户标识sim_uidsstring与uid喜好相似的人群,格式为用户编号:相同阅读量,相似用户之间以逗号分隔sim_numBIGINT相似人群的人数update_datestring数据日期

2、基础用户画像存在MongoDB中

基础用户画像

字段含义_id用户idprofile(离线)positive(实时)用户正画像(喜欢),每个维度以分号间隔,每个子维度以逗号间隔,值格式为key_id:weight,维度含义依次为一级分类、二级分类、关键字、topic、阅读来源negative负画像(不喜欢),其他字段的含义与正画像一样update_time更新时间cityCode或city城市编码

3、相似人群画像也存在MongoDB中

二、整体思路

由于TESLA集群无法直接操作MongoDB,需要将TDW里面的用户画像数据,通过洛子系统导出至HDFS,再与MongoDB中原有群画像进行合并。

整体流程

三、算法流程

算法流程图

四、核心代码

#! /usr/bin/python2.7

# -*- coding: utf8 -*-

import decimal

import time

import math

import sys

import os

import param_map

from pymongo.collection import Collection

from decimal import Decimal

import datetime

reload(sys)

sys.setdefaultencoding("utf-8")

sys.path.append("../")

from utils import mongoUtils, confUtils

decimal.getcontext().prec = 6

BATCH_NUM = 100000

now_time = datetime.datetime.now()

delta = datetime.timedelta(days=30)

delta30 = now_time - delta

time_limit = int(time.mktime(delta30.timetuple()))

print(time_limit)

def split_uid_similarity(uid_num_str):

"""

拆分uid和相似度,并分别返回

:param uid_num_str:

:return:uid,相似度

"""

uid_num = uid_num_str.split(":")

return uid_num[0], float(uid_num[1])

def split_uid_sim_user(user_hd):

"""

拆分uid和相似人群,并分别返回

:param user_hd:

:return: uid,相似人群

"""

uid_sim_user = user_hd.strip().split("\t")

return uid_sim_user[0], uid_sim_user[1]

def dimension_profile_limit(dimension_profile, min_i, max_i, limit, cluster_profile_str):

"""

:param dimension_profile:

:param min_i:

:param max_i:

:param limit:

:param cluster_profile_str:

:return: 返回前limit个特征标签,并对特征权重进行映射

"""

if len(dimension_profile) != 0:

# 先排序

dimension_profile = sorted(dimension_profile.iteritems(), key=lambda c: c[1], reverse=True)

# 再对前limit条记录进行映射

size = limit if len(dimension_profile) > limit else len(dimension_profile)

for i in range(size):

tag = dimension_profile[i]

tag_id = tag[0]

tag_value = tag[1]

tag_value = max_i if tag_value > max_i else tag_value

if tag_value >= min_i:

cluster_profile_str = cluster_profile_str + str(tag_id) + ":" + str(tag_value) + ","

if len(dimension_profile) != 0:

# 假如长度不为0,将最后一个逗号删掉

cluster_profile_str = cluster_profile_str[:-1]

return cluster_profile_str

def cluster_profile_dic2list(cluster_profile, dimension_param_dic):

"""

相似用户群画像阈值过滤,dic->list

:param dimension_param_dic: 维度阈值

:return: 相似用户群特征list

:param cluster_profile:群体画像

"""

cluster_profile_str = ""

if len(cluster_profile) == 0:

return None

for key, dimension_profile in cluster_profile.items():

# 取出维度的阈值

dimention_param = dimension_param_dic.get(str(key))

if dimention_param is not None:

min_i = dimention_param.get("min")

max_i = dimention_param.get("max")

limit = dimention_param.get("limit")

if dimension_profile is not None:

cluster_profile_str = dimension_profile_limit(dimension_profile, min_i, max_i, limit,

cluster_profile_str)

# values为不为None 都需要追加一个分号

cluster_profile_str = cluster_profile_str + ";"

cluster_profile_list = cluster_profile_str[:-1].split(";")

return cluster_profile_list

def sim_users_dic2list(cluster_dic, sim_users_max_size):

"""

# 相似人群数量限制,dic->list

:param sim_users_max_size: 相似人群的最大值

:type cluster_dic: 字典表

:param cluster_dic:相似人群字典表

:return: 相似度最高的相似人群

"""

user_similarity_list = sorted(cluster_dic.iteritems(), key=lambda b: b[1], reverse=True)

sim_users_s = ""

i = 0

new_cluster_dic = {}

for i in range(len(user_similarity_list)):

if i < sim_users_max_size:

user_similarity = user_similarity_list[i]

key = user_similarity[0]

value = user_similarity[1]

new_cluster_dic[key] = value

sim_users_s = sim_users_s + key + ":" + str(value) + ","

else:

break

i = i + 1

sim_users_list = sim_users_s[:-1].split(",")

return sim_users_list, new_cluster_dic

class ClusterProfileComputer(object):

cf = confUtils.getConfig("../conf/setting.conf")

def __init__(self, environment):

self.xw_database, self.xw_client = mongoUtils.getMongodb("XW")

self.pac_database, self.pac_client = mongoUtils.getMongodb("PAC")

self.om_database, self.pac_client = mongoUtils.getMongodb("OM")

item = "LOCAL_SIM_USERS_PATH" if environment == "local" else "SIM_USERS_PATH"

self.sim_users_path = confUtils.getFilePath(self.cf, "SIM_USERS", item)

self.decay_factor = param_map.SIM_USERS_PARAM.get("decay_factor")

self.sim_users_max_size = param_map.SIM_USERS_PARAM.get("sim_users_max_size")

self.similarity_low = param_map.SIM_USERS_PARAM.get("similarity_low")

self.similarity_high = param_map.SIM_USERS_PARAM.get("similarity_high")

@staticmethod

def basic_cursor2dic(platform, mongodb_cursor):

"""

mongodb取出的基础画像存到字典表

:param platform: 平台

:param mongodb_cursor:

:return:

"""

users_profile_map = {}

for user_profile in mongodb_cursor:

_uid = user_profile["name"] if platform == "PAC" else user_profile["_id"]

users_profile_map[_uid] = user_profile

return users_profile_map

@staticmethod

def get_sim_users_profile(all_users_profile, users_similarity):

"""

:param all_users_profile:

:param users_similarity:

:return:相似人群的画像

"""

rs = []

for uid_similarity in users_similarity:

uid, similarity = split_uid_similarity(uid_similarity)

profile = all_users_profile.get(uid)

if profile is not None:

rs.append(profile)

return rs

def dump_basic_profile(self, all_uid, batch_num, platform, profile_collection):

# type: (list, int) -> dict

"""

:return: 平台基础画像

:param platform: 平台

:return: 基础画像字典表

:param profile_collection: 数据库集合

:param all_uid:用户的编号列表

:type batch_num: int

"""

rs = {}

# 数据库查询所有人群用户画像,此画像中没有相似人群

for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):

key = "name" if platform == "PAC" else "_id"

cursor = profile_collection.find({"$and": [{key: {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},

{"update_time": {"$gt": time_limit}}]}, no_cursor_timeout=True)

rs.update(self.basic_cursor2dic(platform, cursor))

cursor.close()

return rs

def compute_single_file(self, path, xw_profile_collection, pac_profile_collection, om_profile_collection):

users = open(path)

all_uid_list = []

uid_sim_map = {}

# uid_sim_map["1_291083852"] = ["1_757155427:8"]

for user_str in users:

# 从hdfs中取出udi的相似人群

uid_hf, sim_users_hd = split_uid_sim_user(user_str)

uid_sim_map[uid_hf] = sim_users_hd.split(",")

all_uid_list.append(uid_hf)

print("uid_sim_map : %d" % len(uid_sim_map))

# 数据库查询所有用户的基础画像,此画像中没有相似人群

platform_basic_profile_dic = {}

xw_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "XW", xw_profile_collection)

platform_basic_profile_dic["XW"] = xw_users_basic_profile_map

pac_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "PAC", pac_profile_collection)

platform_basic_profile_dic["PAC"] = pac_users_basic_profile_map

om_users_basic_profile_map = self.dump_basic_profile(all_uid_list, BATCH_NUM, "OM", om_profile_collection)

platform_basic_profile_dic["OM"] = om_users_basic_profile_map

# print("dump basic profile %d records" % len(pac_all_users_profile_map))

# 数据库查询相似人群画像

cluster_profile_collection = self.xw_database.get_collection(

param_map.MONGODB_CLUSTER_PROFILE_MAP["Cluster"]) # type: Collection

old_cluster_profile_map = dump_cluster_profile_history(self, all_uid_list, cluster_profile_collection,

BATCH_NUM)

print("dump cluster profile %d records" % len(old_cluster_profile_map))

#index = 0

for uid, sim_users_list in uid_sim_map.items():

print ("uid = %s" % uid)

# 合并新老相似人群,并使用衰减因子来计算相似度

users_similarity_dic = merge_sim_users(uid, sim_users_list, self.decay_factor, self.similarity_low,

self.similarity_high, old_cluster_profile_map)

# 相似人群---->将字典表转化为list,存储mongodb

sim_users_list, users_similarity_dic = sim_users_dic2list(users_similarity_dic, self.sim_users_max_size)

print("similar people len: %d" % len(sim_users_list))

platform_cluster_profile_list = []

for platform_name, platform_basic_profile in platform_basic_profile_dic.items():

# 取出用户i相似人群的画像

sim_users_profile_list = self.get_sim_users_profile(platform_basic_profile, sim_users_list)

cluster_profile_dic = cluster_profile_compute(platform_name, sim_users_profile_list,

users_similarity_dic)

# 结果区间映射,相似人群画像特征----->字典表转list,便于存储mongodb

cluster_profile_list = cluster_profile_dic2list(cluster_profile_dic, param_map.DIMENSION_PARAM)

platform_cluster_profile_list.append(cluster_profile_list)

xw_cluster_profile = platform_cluster_profile_list[0]

pac_cluster_profile = platform_cluster_profile_list[1]

om_cluster_profile = platform_cluster_profile_list[2]

old_profile = cluster_profile_collection.find_one({"_id": uid})

if old_profile is None:

create_time = int(time.time())

else:

create_time = old_profile["create_time"]

document = ({"_id": uid, "sim_users": sim_users_list, "xw_cluster_profile": xw_cluster_profile,

"pac_cluster_profile": pac_cluster_profile, "om_cluster_profile": om_cluster_profile,

"create_time": create_time,

"update_time": int(time.time())})

cluster_profile_collection.save(document)

#if index >= 100:

# break

#index = index + 1

print("end")

users.close()

def run(self):

# 相似人群HDFS

xw_profile_collection = self.xw_database.get_collection(param_map.MONGODB_PROFILE_MAP["XW"])

pac_profile_collection = self.pac_database.get_collection(param_map.MONGODB_PROFILE_MAP["PAC"])

om_profile_collection = self.om_database.get_collection(param_map.MONGODB_PROFILE_MAP["OM"])

for dir_path, dir_names, file_names in os.walk(self.sim_users_path):

print(dir_names)

for file_name in file_names:

if "attempt_" in file_name:

print(file_name)

path = os.path.join(dir_path, file_name)

pute_single_file(path, xw_profile_collection, pac_profile_collection, om_profile_collection)

def accumulate_dimension_profile(cluster_dimension_feature, user_dimension, ratio):

"""

将user指定维度的特征累加到群画像

:param cluster_dimension_feature:群画像某个维度的特征

:param user_dimension:用户某个维度的特征

:param ratio:user的权重,公式为相似度/(相似度+10),区间为(1/3,10/11)

:return:指定维度的群画像

"""

if user_dimension != "":

user_feature_list = user_dimension.split(",")

for feature in user_feature_list:

atom = feature.split(":")

if len(atom) == 2:

k = atom[0]

w = atom[1]

if cluster_dimension_feature.get(k) is None:

cluster_dimension_feature[k] = Decimal(w) * ratio

else:

cluster_dimension_feature[k] = Decimal(w) * ratio + Decimal(cluster_dimension_feature[k])

return cluster_dimension_feature

def dump_cluster_profile_history(self, all_uid, collection, batch_num):

rs = {}

for x in xrange(0, int(math.ceil(len(all_uid) / float(batch_num)))):

cursor = collection.find({'_id': {'$in': all_uid[x * batch_num:(x + 1) * batch_num]}},

no_cursor_timeout=True)

rs.update(cluster_cursor2dic(cursor))

cursor.close()

return rs

def cluster_cursor2dic(mongodb_cursor):

"""

mongodb取出的人群画像存到字典表

:param mongodb_cursor:

:return:

"""

users_profile_map = {}

for user_profile in mongodb_cursor:

_uid = user_profile["_id"]

users_profile_map[_uid] = user_profile

return users_profile_map

def merge_sim_users(uid_hdf, sim_users_new, decay_factor, similarity_low, similarity_high, old_cluster_profile_dic):

"""

合并相似人群

:param similarity_low: 相似度最低值

:param similarity_high: 相似度最高值

:param uid_hdf: 用户编号

:param sim_users_new: 最新的相似用户

:param decay_factor: 衰减因子

:param old_cluster_profile_dic:老群体画像

:return:最新的相似人群

"""

cluster_union_dic = {}

# 提取uid和相似度到字典表

for user_similarity in sim_users_new:

_uid, similarity = split_uid_similarity(user_similarity)

cluster_union_dic[_uid] = similarity

# 从mongodb中读取老画像

old = old_cluster_profile_dic.get(uid_hdf)

if old is not None:

sim_users_old = old['sim_users']

for uid_similarity_old in sim_users_old:

uid_similarity_old_list = uid_similarity_old.split(":")

if len(uid_similarity_old_list) == 2:

sim_uid_old = uid_similarity_old_list[0]

try:

weight_old = float(uid_similarity_old_list[1]) * float(decay_factor)

except IndexError:

pass

else:

if (cluster_union_dic.get(sim_uid_old) is None) and (weight_old >= similarity_low):

cluster_union_dic[sim_uid_old] = weight_old

else:

weight_new = weight_old + cluster_union_dic[sim_uid_old]

if weight_new > similarity_high:

weight_new = similarity_high

if weight_new < similarity_low:

del cluster_union_dic[sim_uid_old]

else:

cluster_union_dic[sim_uid_old] = weight_new

return cluster_union_dic

def cluster_profile_compute(platform, sim_users_profile_array, sim_users_dic):

# type: (String, list, dic) -> dic

"""

相似人群特征计算

:param platform:平台

:param sim_users_profile_array: 从mongodb中查出来的相似人群的画像

:param sim_users_dic: 相似人群的相似度字典表

:return: 相似人群画像字典表

"""

cluster_profile_rs = {}

for sim_user_obj in sim_users_profile_array:

key = "name" if platform == "PAC" else "_id"

sim_user_id = sim_user_obj.get(key)

# 获取两两用户的相似度

similarity = sim_users_dic.get(sim_user_id)

if similarity is not None:

sim_num = Decimal(similarity)

# 用户对应的权重

rate = Decimal(sim_num / (10 + sim_num))

# 取出某一个人的画像

profile = sim_user_obj.get("profile") if sim_user_obj.get("profile") is not None else ""

dimension_list = profile.split(";")

i = 0

for u_dimension in dimension_list:

# 获取群体维度i的特征

dimension_feature = cluster_profile_rs.get(i)

if dimension_feature is None:

dimension_feature = {}

# 更新维度i的特征

cluster_profile_rs[i] = accumulate_dimension_profile(dimension_feature, u_dimension, rate)

i = i + 1

return cluster_profile_rs

if __name__ == "__main__":

if len(sys.argv) == 2:

env = sys.argv[1]

else:

env = "local"

computer = ClusterProfileComputer(env)

computer.run()

欢迎大家前往腾讯云+社区或关注云加社区微信公众号(QcloudCommunity),第一时间获取更多海量技术实践干货哦~

海量技术实践经验,尽在云加社区!

如果觉得《消费者人群画像 python_如何正确打开相似人群画像算法》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。