失眠网,内容丰富有趣,生活中的好帮手!
失眠网 > FLASHDB的tsdb时序数据库代码分析和问题解决

FLASHDB的tsdb时序数据库代码分析和问题解决

时间:2024-05-21 19:39:36

相关推荐

FLASHDB的tsdb时序数据库代码分析和问题解决

1、背景

好好看看FlashDB,在使用过程中读取时存在不太好用的地方,因此深入学习下FlashDB的TSDB的相关代码,看看如何解决工程中的问题。主要我每次取的数据是有上限的,而读取API不能读取条数,都是采用迭代方式,不适合我的项目工程。看看采集什么方式比较好,如果有大牛有好方法,请在评论区里指点下。

1.1 参考资料

FlashDB嵌入式数据库之TSDB数据存储解析_¥风笛¥的博客-CSDN博客

typedef struct fdb_db *fdb_db_t;struct fdb_db {const char *name; /**< database name */fdb_db_type type; /**< database type */union {#ifdef FDB_USING_FAL_MODEconst struct fal_partition *part; /**< flash partition for saving database */#endif#ifdef FDB_USING_FILE_MODEconst char *dir;/**< directory path for saving database */#endif} storage;uint32_t sec_size; /**< flash section size. It's a multiple of block size */uint32_t max_size; /**< database max size. It's a multiple of section size */bool init_ok; /**< initialized successfully */bool file_mode;/**< is file mode, default is false */bool not_formatable;/**< is can NOT be formated mode, default is false */#ifdef FDB_USING_FILE_MODE#if defined(FDB_USING_FILE_POSIX_MODE)int cur_file; /**< current file object */#elif defined(FDB_USING_FILE_LIBC_MODE)FILE *cur_file;/**< current file object */#endifuint32_t cur_sec; /**< current operate sector address */#endifvoid (*lock)(fdb_db_t db); /**< lock the database operate */void (*unlock)(fdb_db_t db); /**< unlock the database operate */void *user_data;};

2、时间序列数据库分析

时间序列数据库三个重要结构体,分别为时间序数据库对象、时间序列数据库扇区信息、时间序列数据节点索引日志。

/* TSDB structure 时间序列数据库对象结构体*/struct fdb_tsdb {struct fdb_db parent; /**< inherit from fdb_db */struct tsdb_sec_info cur_sec;/**< current using sector 当前正在使用的扇区信息*/fdb_time_t last_time; /**< last TSL timestamp 最新数据时间戳*/fdb_get_time get_time; /**< the current timestamp get function */size_t max_len;/**< the maximum length of each log */uint32_t oldest_addr; /**< the oldest sector start address */bool rollover; /**< the oldest data will rollover by newest data, default is true */void *user_data;};typedef struct fdb_tsdb *fdb_tsdb_t;/* TSDB section information 时间序列数据库扇区信息结构体*/struct tsdb_sec_info {bool check_ok; /**< sector header check is OK */fdb_sector_store_status_t status; /**< sector store status 第几个状态了@see fdb_sector_store_status_t */uint32_t addr; /**< sector start address */uint32_t magic;/**< magic word(`T`, `S`, `L`, `0`) */fdb_time_t start_time; /**< the first start node's timestamp, 0xFFFFFFFF: unused */fdb_time_t end_time;/**< the last end node's timestamp, 0xFFFFFFFF: unused */uint32_t end_idx; /**< the last end node's index, 0xFFFFFFFF: unused */fdb_tsl_status_t end_info_stat[2]; /**< the last end node's info status */size_t remain; /**< remain size */uint32_t empty_idx;/**< the next empty node index address */uint32_t empty_data;/**< the next empty node's data end address */};typedef struct tsdb_sec_info *tsdb_sec_info_t;/*扇区头数据*/struct sector_hdr_data {uint8_t status[FDB_STORE_STATUS_TABLE_SIZE]; /**< sector store status存储的状态4个,倒过来读取,看看最新到哪个状态了 @see fdb_sector_store_status_t */uint32_t magic;/**< magic word(`T`, `S`, `L`, `0`) */fdb_time_t start_time; /**< the first start node's timestamp */struct {fdb_time_t time;/**< the last end node's timestamp */uint32_t index;/**< the last end node's index */uint8_t status[TSL_STATUS_TABLE_SIZE]; /**< end node status, @see fdb_tsl_status_t */} end_info[2];uint32_t reserved;};typedef struct sector_hdr_data *sector_hdr_data_t;/* time series log node index data 时间序列日志节点数据*/struct log_idx_data {uint8_t status_table[TSL_STATUS_TABLE_SIZE]; /**< node status, @see fdb_tsl_status_t */fdb_time_t time; /**< node timestamp */uint32_t log_len; /**< node total length (header + name + value), must align by FDB_WRITE_GRAN */uint32_t log_addr; /**< node address */};typedef struct log_idx_data *log_idx_data_t;

对于一个扇区的数据,前40个字节存放扇区信息--struct sector_hdr_data,接着是存放节点log信息---struct log_idx_data,数据信息从扇区的结尾开始存放---长度由对应的存放节点log信息中的长度tsl.log_len确定。节点log信息向后依次存放,节点数据信息从扇区尾向前依次存放。

主要研究数据的存储或读取。首先看存储数据

2.1 存储数据

fdb_err_t fdb_tsl_append(fdb_tsdb_t db, fdb_blob_t blob)

最重要的调用tsl_append

fdb_err_t tsl_append(fdb_tsdb_t db, fdb_blob_t blob){fdb_err_t result = FDB_NO_ERR;fdb_time_t cur_time = db->get_time();FDB_ASSERT(blob->size <= db->max_len);/* check the current timestamp, MUST more than the last save timestamp */if (cur_time < db->last_time) {FDB_INFO("Warning: current timestamp (%" PRIdMAX ") is less than the last save timestamp (%" PRIdMAX "). This tsl will be dropped.\n",(intmax_t )cur_time, (intmax_t )(db->last_time));return FDB_WRITE_ERR;}result = update_sec_status(db, &db->cur_sec, blob, cur_time); //扇区信息更新if (result != FDB_NO_ERR) {return result;}/* write the TSL node */result = write_tsl(db, blob, cur_time); //写时间序列数据if (result != FDB_NO_ERR) {return result;}//更新DB中关于当前扇区结束node的信息和其他信息/* recalculate the current using sector info */ db->cur_sec.end_idx = db->cur_sec.empty_idx;db->cur_sec.end_time = cur_time;db->cur_sec.empty_idx += LOG_IDX_DATA_SIZE;db->cur_sec.empty_data -= FDB_WG_ALIGN(blob->size);db->cur_sec.remain -= LOG_IDX_DATA_SIZE + FDB_WG_ALIGN(blob->size);db->last_time = cur_time; //更新最新数据时间戳return result;}

时间序列数据库扇区信息对于扇区状态为

FDB_SECTOR_STORE_EMPTY:

在更新扇区状态的函数中更新数据状态位FDB_SECTOR_STORE_USING,将flash中相关信息也更新一下,包括status_table[0]和start_time两个字段。

FDB_SECTOR_STORE_FULL:

直接返回FDB_SAVED_FULL,表示扇区满了。

FDB_SECTOR_STORE_USING:

扇区正在使用且剩余空间不足(小于LOG_IDX_DATA_SIZE+存储数据的大小),存储结束node的索引号和时间戳

static fdb_err_t update_sec_status(fdb_tsdb_t db, tsdb_sec_info_t sector, fdb_blob_t blob, fdb_time_t cur_time)

result = update_sec_status(db, &db->cur_sec, blob, cur_time);

---》

uint8_t status[FDB_STORE_STATUS_TABLE_SIZE];

......

_FDB_WRITE_STATUS(db, sector->addr, status, FDB_SECTOR_STORE_STATUS_NUM, FDB_SECTOR_STORE_USING, true);

/* save the start timestamp */

FLASH_WRITE(db, sector->addr + SECTOR_START_TIME_OFFSET, (uint32_t *)&cur_time, sizeof(fdb_time_t), true);

-----》

fdb_err_t _fdb_write_status(fdb_db_t db, uint32_t addr, uint8_t status_table[], size_t status_num, size_t status_index, bool sync)

....

result = _fdb_flash_write(db, addr + byte_index, (uint32_t *) &status_table[byte_index], FDB_WRITE_GRAN / 8, sync); //实际上就是status[0] = 0x00;

2.2读取数据

enum fdb_tsl_status {FDB_TSL_UNUSED,FDB_TSL_PRE_WRITE,FDB_TSL_WRITE,FDB_TSL_USER_STATUS1,FDB_TSL_DELETED,FDB_TSL_USER_STATUS2,#define FDB_TSL_STATUS_NUM 6};typedef enum fdb_tsl_status fdb_tsl_status_t;/* time series log node object */struct fdb_tsl {fdb_tsl_status_t status; /**< 节点状态node status, @see fdb_log_status_t */fdb_time_t time; /**< 节点时间戳node timestamp */uint32_t log_len; /**< 节点长度,必须对齐log length, must align by FDB_WRITE_GRAN */struct {uint32_t index;/**< node索引的地址node index address */uint32_t log; /**< log数据地址log data address */} addr; /*地址信息*/};typedef struct fdb_tsl *fdb_tsl_t;

读取数据,从分析fdb_tsl_iter_by_time开始

/*** The TSDB iterator for each TSL by timestamp.** @param db database object* @param from starting timestamp. It will be a reverse iterator when ending timestamp less than starting timestamp* @param to ending timestamp* @param cb callback* @param arg callback argument*/void fdb_tsl_iter_by_time(fdb_tsdb_t db, fdb_time_t from, fdb_time_t to, fdb_tsl_cb cb, void *cb_arg){struct tsdb_sec_info sector;uint32_t sec_addr, start_addr, traversed_len = 0;struct fdb_tsl tsl;bool found_start_tsl = false;//定义了两个函数指针uint32_t (*get_sector_addr)(fdb_tsdb_t , tsdb_sec_info_t , uint32_t);uint32_t (*get_tsl_addr)(tsdb_sec_info_t , fdb_tsl_t);if (!db_init_ok(db)) {FDB_INFO("Error: TSL (%s) isn't initialize OK.\n", db_name(db));}/*正序的*/if(from <= to) {start_addr = db->oldest_addr; //开始就是最老数据get_sector_addr = get_next_sector_addr; //函数指针赋值 下一个扇区get_tsl_addr = get_next_tsl_addr; //函数指针赋值 下一个时间序列log地址} else { //倒序start_addr = db->cur_sec.addr; //开始就是最新的数据get_sector_addr = get_last_sector_addr; //函数指针赋值--上一个扇区get_tsl_addr = get_last_tsl_addr; //函数指针赋值---上一个时间序列log地址}// FDB_INFO("from %s", ctime((const time_t * )&from));// FDB_INFO("to %s", ctime((const time_t * )&to));if (cb == NULL) {return;}sec_addr = start_addr; /*扇区内起始地址*/db_lock(db);/* search all sectors */do {traversed_len += db_sec_size(db); //加上一个扇区的大小/*根据提供的扇区地址获取扇区信息,填充到sector中去*/if (read_sector_info(db, sec_addr, &sector, false) != FDB_NO_ERR) {continue;}/* sector has TSL */if ((sector.status == FDB_SECTOR_STORE_USING || sector.status == FDB_SECTOR_STORE_FULL)) {if (sector.status == FDB_SECTOR_STORE_USING) {/* copy the current using sector status */sector = db->cur_sec;}if ((found_start_tsl)|| (!found_start_tsl &&((from <= to && ((sec_addr == start_addr && from <= sector.start_time) || from <= sector.end_time)) ||(from > to && ((sec_addr == start_addr && from >= sector.end_time) || from >= sector.start_time))))) {//第一个节点索引地址 索引的最后一个地址uint32_t start = sector.addr + SECTOR_HDR_DATA_SIZE, end = sector.end_idx;found_start_tsl = true;/* search the first start TSL address */tsl.addr.index = search_start_tsl_addr(db, start, end, from, to);/* search all TSL */do {//读取节点索引logread_tsl(db, &tsl);if (tsl.status != FDB_TSL_UNUSED) {//争取if ((from <= to && tsl.time >= from && tsl.time <= to)|| (from > to && tsl.time <= from && tsl.time >= to)) {/* iterator is interrupted when callback return true */if (cb(&tsl, cb_arg)) {goto __exit;}} else {goto __exit;}}} while ((tsl.addr.index = get_tsl_addr(&sector, &tsl)) != FAILED_ADDR);}} else if (sector.status == FDB_SECTOR_STORE_EMPTY) {goto __exit;}} while ((sec_addr = get_sector_addr(db, &sector, traversed_len)) != FAILED_ADDR);__exit:db_unlock(db);}

重点来看search_start_tsl_addr这个查找函数

/** Found the matched TSL address.*/static int search_start_tsl_addr(fdb_tsdb_t db, int start, int end, fdb_time_t from, fdb_time_t to){struct fdb_tsl tsl;while (true) {/*二分法*/tsl.addr.index = start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);read_tsl(db, &tsl);if (tsl.time < from) {start = tsl.addr.index + LOG_IDX_DATA_SIZE;} else if (tsl.time > from) {end = tsl.addr.index - LOG_IDX_DATA_SIZE;} else {return tsl.addr.index;}if (start > end) {if (from > to) {tsl.addr.index = start;read_tsl(db, &tsl);if (tsl.time > from) {start -= LOG_IDX_DATA_SIZE;}}break;}}return start;}

到这里,我已经获取到自己想要的,提供一个最大的迭代次数,到达迭代次数直接结束迭代,就可以完成我所需要的。

3、调试中的问题

感觉FlashDB源码中存在一个BUG,二分查找有点问题。

/* Return the most contiguous size aligned at specified width. RT_ALIGN(13, 4)* would return 16.*/#define FDB_ALIGN(size, align)(((size) + (align) - 1) & ~((align) - 1))/** Found the matched TSL address.*/static int search_start_tsl_addr(fdb_tsdb_t db, int start, int end, fdb_time_t from, fdb_time_t to){struct fdb_tsl tsl;while (true) {tsl.addr.index = start + FDB_ALIGN((end - start) / 2, LOG_IDX_DATA_SIZE);

这里应该是二分查找,找中间值,不是对齐,因此改为

tsl.addr.index = start + ((end - start) /(2 * LOG_IDX_DATA_SIZE))*LOG_IDX_DATA_SIZE;

效果还可以。

可能是我没有理解这个宏函数,懂得评论下。

如果觉得《FLASHDB的tsdb时序数据库代码分析和问题解决》对你有帮助,请点赞、收藏,并留下你的观点哦!

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。