I have some code which reads data from a queue . I want the function to be such that it takes a callback function as a parameter , the callback function basically contains code on what to do with the data that is extracted from the queue . The type for the getMessages fucntion is like so

getMessages ::  (ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Kafkamonad (Either Error Result)) -> KafkaMonad (Either Error ())

so the callback function needs to be always of the type

(ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Kafkamonad (Either Error Result))

although this code works I would like to have a more generic type for the get messages function ,something like

getmessages ::(ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> Kafkamonad a) -> KafkaMonad (Either Error ())
getMessages ::  f -> KafkaMonad (Either Error ())

When I try to use the mentioned Types ,like Kafkamonad a it throws me the error

Couldn't match type ‘a’ with ‘Either Error b0’ ‘a’ is a rigid type variable bound by the type signature for: 
getMessages :: forall a. Callback a -> KafkaMonad a at src/Kafka/Consume.hs:48:17 
Expected type: KafkaMonad (Either KafkaError b0) 
Actual type: KafkaMonad a 

The related functions are as follows

type Callback a = ConsumerRecord (Maybe ByteString) (Maybe ByteString) -> KafkaMonad a
getMessages ::  Callback a -> KafkaMonad a
getMessages callback = do
       let ....
       runHandler(Right kc)  = processMessages kc callback
    bracket mkConsumer clConsumer runHandler

processMessages :: Kc -> Callback a -> KafkaMonad a
processMessages k callback=     mapM_ (\_ -> do
                    ecr <- pollMessage k (Timeout 1000)
                    case ecr of
                      Right cr ->  do
                                   err <- commitAllOffsets OffsetCommit k
                                   case err of
                                        Nothing  -> callback cr
                                        Just err -> throwIO err 
                      Left err -> return $ Left $ KafkaError "Procesing Stopped"  

            ) [0 :: Integer .. ]
    return $ Right ()

What sort of Class constraints can I use to achieve this ? Or any other solution would be great too .

1 Answers

chepner On

You can define a type alias parameterized over the type wrapped by Kafkamonad:

type CallbackType a = ConsumerRecord (Maybe ByteString) (MaybeByteString) -> Kafkamonad a

Then you can define getMesagages using this type as well.

getMessages :: CallbackType a -> KafkaMonad (Either Error ())

Note that this type annotation dictates what the definition of getMessages can be; you can't make any assumptions about what a might be. If there are any restrictions (imposed by Kafkamonad or by a required definition of getMessages), you'll need to specify those in your question.

Based on your error message, you have unstated assumptions about what what type is wrapped by Kafkamonad which prevents your from fully parameterizing the callback type. Try

type CallbackType a = ConsumerRecord (Maybe ByteString) (MaybeByteString) -> Kafkamonad (Either KafkaError a)

or modify your definition to remove the assumptions.