Après la publication de notre addon InfluxDB, vous vous demandez peut-être si une base de données de séries temporelles peut correspondre à vos besoins. Dans cet article de blog, nous allons expliquer un exemple d'utilisation typique d'une base de données de séries temporelles.
Un exemple d'utilisation
Voyons un exemple simple d'application utilisant InfluxDB comme base de données de séries temporelles. Cette application surveille un hashtag sur Twitter et compte combien de tweets sont publiés avec ce hashtag chaque minute. Les résultats sont affichés sur un graphique disponible ici.
Dans cet exemple, différentes technologies sont utilisées. Le backend est construit avec :
La merveilleuse base de données de séries temporelles InfluxDB pour stocker un nouveau point de données pour chaque tweet contenant le hashtag surveillé
Un worker Go qui utilise la go-twitter bibliothèque pour s'abonner à un flux public Twitter.
Un serveur web Go utilisant le framework Martini.
Le frontend utilise les technologies suivantes :
JavaScript pur pour afficher les résultats sur un graphique généré à l'aide de la bibliothèque C3.js.
Vous pouvez voir l'application ici et lire le code sur GitHub. Voici une capture d'écran de cet exemple :

Comment InfluxDB est utilisé dans cet exemple
Les données dans InfluxDB sont organisées par séries temporelles, chacune d'elles contenant une valeur mesurée. Dans cet exemple, la mesure est le nombre de tweets, les tags sont le type de tweet (c'est-à-dire tweet, DM ou événement) et le hashtag (afin que nous puissions surveiller plusieurs hashtags), et le seul champ est la mesure.
Chaque fois qu'un tweet est publié contenant le hashtag surveillé, nous ajoutons un point de données à InfluxDB en utilisant la date de création du tweet fournie par l'API Twitter comme horodatage. Cette partie est expliquée plus en détail dans la section suivante.
Maintenant, dans le frontend, nous voulons afficher le nombre de tweets chaque minute au cours de la dernière heure. Pour récupérer des données de la base de données, InfluxDB fournit InfluxQL, un langage semblable à SQL. Dans notre cas, la requête est facilement interprétée par quiconque comprenant SQL :
SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
SELECT sum(value) FROM tweets WHERE hashtag = 'RT' AND time >= now() - 1h GROUP BY time(1m) ORDER BY time DESC LIMIT 60
value contient le nombre de tweets avec le hashtag spécifié chaque seconde.
WHERE hashtag = 'RT' AND time >= now() - 1h filtre le hashtag spécifié et uniquement pour l'heure passée.
GROUP BY time(1m) détermine comment InfluxDB va regrouper les résultats de la requête dans le temps. Ici, nous avons la somme des valeurs pour chaque minute. Plus d'informations dans la documentation d'InfluxDB.
ORDER BY time DESC LIMIT 60 pour obtenir les 60 derniers résultats. Avec le précédent GROUP BY, cela garantit que nous obtenons les résultats pour l'heure passée.
Vous pouvez exécuter cette requête en utilisant la console InfluxDB. Vous pouvez obtenir cette console avec le Scaling CLI :
scalingo --app sample-influxdb influxdb-console
scalingo --app sample-influxdb influxdb-console
scalingo --app sample-influxdb influxdb-console
scalingo --app sample-influxdb influxdb-console
InfluxDB fournit également un moyen de supprimer automatiquement les enregistrements plus anciens qu'une certaine date. Comme nous n'avons pas besoin des données plus anciennes qu'une heure, nous créons une politique de rétention par défaut d'une heure :
CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT
CREATE RETENTION POLICY "one_hour" ON "tweets" DURATION 1h REPLICATION 1 DEFAULT
Créer une politique de rétention n'est pas disponible pour les utilisateurs de base de données simples. Si vous souhaitez en créer une, vous pouvez vous connecter à votre tableau de bord de base de données et aller à l'onglet "Avancé".

Plus d'informations sur le downsampling et la rétention des données sont disponibles dans la documentation.
La partie Go de l'exemple
InfluxDB fournit une bibliothèque officielle pour interagir avec un serveur InfluxDB. Nous allons montrer comment nous l'avons utilisée dans le contexte de cet exemple. En utilisant cette bibliothèque, les données de séries temporelles sont appelées points. Ces points sont écrits dans la base de données à l'aide d'inserts par lots.
Chaque fois qu'un tweet est publié en utilisant le hashtag surveillé, nous incrémentons un compteur et ajoutons ou mettons à jour un point dans la base de données. Le snippet suivant montre la fonction appelée chaque fois qu'un tweet est publié. Ce code appartient au worker de notre exemple.
Pour plus de clarté, nous avons sauté tous les contrôles d'erreur dans ces extraits de code. N'oubliez pas de les ajouter dans votre code de production...
func addTweet(createdAt, type string) {
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: config.InfluxConnectionInformation.Database,
Precision: "s",
})
pt, _ := influx.NewPoint(
"tweets",
map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
map[string]interface{}{"value": nbTweetsCurrentSecond},
createdAt
)
bp.AddPoint(pt)
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
client.Write(bp
func addTweet(createdAt, type string) {
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: config.InfluxConnectionInformation.Database,
Precision: "s",
})
pt, _ := influx.NewPoint(
"tweets",
map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
map[string]interface{}{"value": nbTweetsCurrentSecond},
createdAt
)
bp.AddPoint(pt)
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
client.Write(bp
func addTweet(createdAt, type string) {
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: config.InfluxConnectionInformation.Database,
Precision: "s",
})
pt, _ := influx.NewPoint(
"tweets",
map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
map[string]interface{}{"value": nbTweetsCurrentSecond},
createdAt
)
bp.AddPoint(pt)
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
client.Write(bp
func addTweet(createdAt, type string) {
bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
Database: config.InfluxConnectionInformation.Database,
Precision: "s",
})
pt, _ := influx.NewPoint(
"tweets",
map[string]string{"type": type, "hashtag": config.E["HASHTAG"]},
map[string]interface{}{"value": nbTweetsCurrentSecond},
createdAt
)
bp.AddPoint(pt)
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
client.Write(bp
La variable config.InfluxConnectionInformation est de type *InfluxInfo. Nous l'avons remplie en analysant la variable d'environnement SCALINGO_INFLUX_URL avec la fonction suivante :
func parseConnectionString(con string) (*InfluxInfo, error) {
url, err := url.Parse(con)
if err != nil {
return nil, errgo.Mask(err)
}
var password, username string
if url.User != nil {
password, _ = url.User.Password()
username = url.User.Username()
}
return &InfluxInfo{
Host: url.Scheme + "://" + url.Host,
User: username,
Password: password,
Database: url.Path[1:],
ConnectionString: con,
}, nil
func parseConnectionString(con string) (*InfluxInfo, error) {
url, err := url.Parse(con)
if err != nil {
return nil, errgo.Mask(err)
}
var password, username string
if url.User != nil {
password, _ = url.User.Password()
username = url.User.Username()
}
return &InfluxInfo{
Host: url.Scheme + "://" + url.Host,
User: username,
Password: password,
Database: url.Path[1:],
ConnectionString: con,
}, nil
func parseConnectionString(con string) (*InfluxInfo, error) {
url, err := url.Parse(con)
if err != nil {
return nil, errgo.Mask(err)
}
var password, username string
if url.User != nil {
password, _ = url.User.Password()
username = url.User.Username()
}
return &InfluxInfo{
Host: url.Scheme + "://" + url.Host,
User: username,
Password: password,
Database: url.Path[1:],
ConnectionString: con,
}, nil
func parseConnectionString(con string) (*InfluxInfo, error) {
url, err := url.Parse(con)
if err != nil {
return nil, errgo.Mask(err)
}
var password, username string
if url.User != nil {
password, _ = url.User.Password()
username = url.User.Username()
}
return &InfluxInfo{
Host: url.Scheme + "://" + url.Host,
User: username,
Password: password,
Database: url.Path[1:],
ConnectionString: con,
}, nil
Enfin, le frontend de notre application a besoin d'un moyen pour lire les données de la base de données. Nous avons ajouté un endpoint GET /tweets dans le serveur web qui retourne le nombre de tweets chaque minute pour la dernière heure. La réponse est encodée en JSON pour s'intégrer parfaitement avec la bibliothèque JavaScript que nous utilisons.
Ce point de terminaison construit et exécute la requête InfluxQL :
func LastMinutesTweets() ([]InfluxValue, error) {
queryString := "SELECT SUM('value') FROM 'tweets'"
queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
queryString += " AND time >= now() - 60m"
queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"
return executeQuery(queryString)
}
func executeQuery(queryString string) ([]InfluxValue, error) {
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
response, _ := client.Query(influx.Query{
Command: queryString,
Database: config.InfluxConnectionInformation.Database,
})
if response.Error() != nil {
panic(response.Error())
}
return convertResults(response
func LastMinutesTweets() ([]InfluxValue, error) {
queryString := "SELECT SUM('value') FROM 'tweets'"
queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
queryString += " AND time >= now() - 60m"
queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"
return executeQuery(queryString)
}
func executeQuery(queryString string) ([]InfluxValue, error) {
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
response, _ := client.Query(influx.Query{
Command: queryString,
Database: config.InfluxConnectionInformation.Database,
})
if response.Error() != nil {
panic(response.Error())
}
return convertResults(response
func LastMinutesTweets() ([]InfluxValue, error) {
queryString := "SELECT SUM('value') FROM 'tweets'"
queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
queryString += " AND time >= now() - 60m"
queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"
return executeQuery(queryString)
}
func executeQuery(queryString string) ([]InfluxValue, error) {
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
response, _ := client.Query(influx.Query{
Command: queryString,
Database: config.InfluxConnectionInformation.Database,
})
if response.Error() != nil {
panic(response.Error())
}
return convertResults(response
func LastMinutesTweets() ([]InfluxValue, error) {
queryString := "SELECT SUM('value') FROM 'tweets'"
queryString += " WHERE hashtag = '" + config.E["HASHTAG"] + "'"
queryString += " AND time >= now() - 60m"
queryString += " GROUP BY time(1m) fill(none) ORDER BY time DESC LIMIT 60"
return executeQuery(queryString)
}
func executeQuery(queryString string) ([]InfluxValue, error) {
client, _ := influx.NewHTTPClient(influx.HTTPConfig{
Addr: config.InfluxConnectionInformation.Host,
Username: config.InfluxConnectionInformation.User,
Password: config.InfluxConnectionInformation.Password,
})
defer client.Close()
response, _ := client.Query(influx.Query{
Command: queryString,
Database: config.InfluxConnectionInformation.Database,
})
if response.Error() != nil {
panic(response.Error())
}
return convertResults(response
Avec la puissance de Go et InfluxDB combinées, nous montrons à quel point il est facile de gérer des données de séries temporelles et de les afficher en temps réel sur une page web. En utilisant Scalingo, une telle application peut être mise en ligne en quelques minutes.