support different type ops concurrent fetch data from one channel && TODO: some op likes combine_op should get data with the sanme data_id not support yet